You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/11/25 18:50:28 UTC

Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

rangadi commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1405191450


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -47,9 +47,19 @@ case class SessionKey(userId: String, sessionId: String)
 case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
     extends Logging {
 
-  @volatile private var lastRpcAccessTime: Option[Long] = None
+  // Time when the session was started.
+  private val startTime: Long = System.currentTimeMillis()

Review Comment:
   Convention is to use unit suffix (`startTimeMs` for milliseconds here) when the type is long. `Time` is ok if this was a type like `Instant`.  



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.

Review Comment:
   Any concerns if `close()` above throws? 
   Btw, when do we clean up `closedSessionsCache`?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -186,7 +196,13 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   def classloader: ClassLoader = artifactManager.classloader
 
   private[connect] def updateAccessTime(): Unit = {
-    lastRpcAccessTime = Some(System.currentTimeMillis())
+    lastAccessTime = System.currentTimeMillis()
+    logInfo(s"Session $key accessed, time $lastAccessTime.")

Review Comment:
   Temporary log?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())
+
+    if (eventManager.status == SessionStatus.Pending) {
+      // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized

Review Comment:
   Can we initialize `SessionHolder.forTesting` to avoid this?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())

Review Comment:
   Is is possible to guard `closedTime` with `executionsLock` rather than having to explain how this is correct?
   (may be this field is used in many places).



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder

Review Comment:
   ❤️



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   Why is this expiry time rather than custom inactivity time? 
   Who sets this in PR? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder
-      .newBuilder()
-      .ticker(Ticker.systemTicker())
-      .expireAfterAccess(
-        SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT),
-        TimeUnit.MILLISECONDS)
-      .removalListener(new RemoveSessionListener)
-      .build[SessionKey, SessionHolder]()
+  private val sessionStore = mutable.HashMap[SessionKey, SessionHolder]()

Review Comment:
   What is the lock this is accessed under? Comment here.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {
+      // .. and remove them.
+      toRemove.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo

Review Comment:
   Should we check if this session had its `lastActivityTime` updated? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {

Review Comment:
   Not required.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder

Review Comment:
   ❤️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org