You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/11/23 20:04:18 UTC

[PR] [SPARK-46075] Improvements to SparkConnectSessionManager [spark]

juliuszsompolski opened a new pull request, #43985:
URL: https://github.com/apache/spark/pull/43985

   ### What changes were proposed in this pull request?
   
   This is factored out from https://github.com/apache/spark/pull/43913 and is a continuation to https://github.com/apache/spark/pull/43546 when SparkConnectSessionManager was introduced.
   
   We want to remove the use a Guava cache as session cache, and have our custom logic with more control. This refactors the Session Manager and adds more tests.
   
   We introduce a mechanism that mirrors SparkConnectExecutionManager instead.
   
   ### Why are the changes needed?
   
   With guava cache, only a single "inactivity timeout" can be specified for the whole cache. This can't be for example overriden per session. The actual invalidation also happens not in it's own thread inside guava, but it's work-stealing lazily piggy backed to other operations on the cache, making it opaque when session removal will actually happen.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   SparkConnectSessionManagerSuite added.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Github Copilot was assisting in some boilerplate auto-completion.
   
   Generated-by: Github Copilot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1414342280


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala:
##########
@@ -95,17 +95,17 @@ private[connect] class ExecuteHolder(
   private val runner: ExecuteThreadRunner = new ExecuteThreadRunner(this)
 
   /** System.currentTimeMillis when this ExecuteHolder was created. */
-  val creationTime = System.currentTimeMillis()
+  val creationTimeMs = System.currentTimeMillis()

Review Comment:
   Thanks for the naming fixes.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {

Review Comment:
   Do we need to check `ignoreExpirationTime`? 
   
   



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -47,9 +48,19 @@ case class SessionKey(userId: String, sessionId: String)
 case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
     extends Logging {
 
-  @volatile private var lastRpcAccessTime: Option[Long] = None
+  // Time when the session was started.
+  private val startTimeMs: Long = System.currentTimeMillis()
 
-  @volatile private var isClosing: Boolean = false
+  // Time when the session was last accessed (retrieved from SparkConnectSessionManager)
+  @volatile private var lastAccessTimeMs: Long = System.currentTimeMillis()
+
+  // Time when the session was closed.
+  // Set only by close(), and only once.
+  @volatile private var closedTimeMs: Option[Long] = None
+
+  // Custom timeout after a session expires due to inactivity.
+  // Used by SparkConnectSessionManager instead of default timeout if set.

Review Comment:
   Indicate here if `-1` indicates forever.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +94,156 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started yet.
+
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  }
+
+  // Removes session from sessionStore and returns it.
+  private def removeSessionHolder(key: SessionKey): Option[SessionHolder] = {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
+        // Put into closedSessionsCache, so that it cannot get accidentally recreated
+        // by getOrCreateIsolatedSession.
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
+      }
     }
+    sessionHolder
+  }
+
+  // Shut downs the session after removing.
+  private def shutdownSessionHolder(sessionHolder: SessionHolder): Unit = {
+    sessionHolder.close()
+    // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+    closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
+    val sessionHolder = removeSessionHolder(key)
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach(shutdownSessionHolder(_))
   }
 
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
-        // First put into closedSessionsCache, so that it cannot get accidentally recreated by
-        // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
-      }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
     }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
   }
 
-  def shutdown(): Unit = {
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL)
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try {
+              val defaultInactiveTimeoutMs =
+                SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT)
+              periodicMaintenance(defaultInactiveTimeoutMs)
+            } catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeoutMs: Long,
+      ignoreCustomTimeout: Boolean = false): Unit = {

Review Comment:
   Could you add a description for `ignoreCustomTimeout` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1410594430


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.

Review Comment:
   We deliberetely put it twice
   - the first time, under lock prevents the session to be recreated, but the session is not fully closed at that point
   - the second time, after it's finished shutting down, updates the record with a final record that has the final state of the session (closed time etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409807117


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())
+
+    if (eventManager.status == SessionStatus.Pending) {
+      // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized

Review Comment:
   Some unit tests are creating SessionHolder.forTesting with a null SparkSession, in which case initializeSession fails with NPE... My reasoning was finding it cleaner to skip closing of session that was not initialized here, than accepting a null SparkSession in other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409865055


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())

Review Comment:
   I'll think about this locking with a fresh head tomorrow. I agree there is something too complicated here if it requires extra explanation like that :-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1411326360


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())

Review Comment:
   @rangadi I put a lot of thinking into this locking, and I think it's actually the way I want it.
   This comment was not really in the best place - it should be by the call to `removeAllExecutionsForSession`.
   
   The crux here is that adding or removing executions from the session is guarded by `executionsLock`.
   After closedTime is set, adding executions is no longer be allowed.
   So what I want to explain here is that when I remove all executions after this place, there cannot be any executions being added still in flight (because removing executions needs to take the executionsLock, that would need to be released by any addexecute). Setting closedTime itself does not need to be protected by lock like that.
   
   I moved around and redacted the comments to hopefully make it more clear...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409864334


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.

Review Comment:
   Generally nothing really bad would happen, as this would be an unexpected exception, and the session is already removed from the manager.
   If this is called from by periodic maintenance, it would be caught, and then next periodic maintenance will go on.
   If this is called from ReleaseSession, an error will be thrown to the user, which I think is correct if something unexpected happened. We might potentially not clean something up fully, but this is in general an unexpected situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #43985:
URL: https://github.com/apache/spark/pull/43985#issuecomment-1838172192

   gentle ping @hvanhovell @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1403717794


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())
+
+    if (eventManager.status == SessionStatus.Pending) {
+      // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized
+      // and can't be closed.
+      return
+    }

Review Comment:
   This is weird. I had was hitting issues from SparkConnectSessionManager.invalidateAllSessions because of some of these SessionHolder.forTesting session getting closed while not initialized (e.g. with null SparkSession and then hitting NPE trying to access artifactManager.
   
   But it should have happened before when invalidateAllSessions() called sessionStore.invalidateAll(), and this should have triggered RemoveSessionListener.
   The fact that tests were not failing suggests that either invalidateAll() did not trigger RemoveSessionListener, or that it swallowed errors from it. The latter may be possible, given that removal in Guava often happens via work-stealing piggy backed to other operations, so it would be prudent to eat all errors there. Now as we do it "ourselves", it surfaced.
    



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }

Review Comment:
   I was wondering if it needs extra synchronization, but concluded that not.
   It is already explained below why it doesn't need extra synchronization against addExecuteHolder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1405191450


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -47,9 +47,19 @@ case class SessionKey(userId: String, sessionId: String)
 case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
     extends Logging {
 
-  @volatile private var lastRpcAccessTime: Option[Long] = None
+  // Time when the session was started.
+  private val startTime: Long = System.currentTimeMillis()

Review Comment:
   Convention is to use unit suffix (`startTimeMs` for milliseconds here) when the type is long. `Time` is ok if this was a type like `Instant`.  



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.

Review Comment:
   Any concerns if `close()` above throws? 
   Btw, when do we clean up `closedSessionsCache`?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -186,7 +196,13 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   def classloader: ClassLoader = artifactManager.classloader
 
   private[connect] def updateAccessTime(): Unit = {
-    lastRpcAccessTime = Some(System.currentTimeMillis())
+    lastAccessTime = System.currentTimeMillis()
+    logInfo(s"Session $key accessed, time $lastAccessTime.")

Review Comment:
   Temporary log?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())
+
+    if (eventManager.status == SessionStatus.Pending) {
+      // Testing-only: Some sessions created by SessionHolder.forTesting are not fully initialized

Review Comment:
   Can we initialize `SessionHolder.forTesting` to avoid this?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -205,14 +220,26 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Called only by SparkConnectSessionManager.
    */
   private[connect] def close(): Unit = {
+    // It is called only by SparkConnectSessionManager.closeSession, only after it's removed from
+    // the sessionStore there guarantees that it is called only once.
+    if (closedTime.isDefined) {
+      throw new IllegalStateException(s"Session $key is already closed.")
+    }
+
     logInfo(s"Closing session with userId: $userId and sessionId: $sessionId")
 
-    // After isClosing=true, SessionHolder.addExecuteHolder() will not allow new executions for
-    // this session. Because both SessionHolder.addExecuteHolder() and
+    // After closedTime is defined, SessionHolder.addExecuteHolder() will not allow new executions
+    // to be added for this session. Because both SessionHolder.addExecuteHolder() and
     // SparkConnectExecutionManager.removeAllExecutionsForSession() are executed under
     // executionsLock, this guarantees that removeAllExecutionsForSession triggered below will
     // remove all executions and no new executions will be added in the meanwhile.
-    isClosing = true
+    closedTime = Some(System.currentTimeMillis())

Review Comment:
   Is is possible to guard `closedTime` with `executionsLock` rather than having to explain how this is correct?
   (may be this field is used in many places).



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder

Review Comment:
   ❤️



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   Why is this expiry time rather than custom inactivity time? 
   Who sets this in PR? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder
-      .newBuilder()
-      .ticker(Ticker.systemTicker())
-      .expireAfterAccess(
-        SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT),
-        TimeUnit.MILLISECONDS)
-      .removalListener(new RemoveSessionListener)
-      .build[SessionKey, SessionHolder]()
+  private val sessionStore = mutable.HashMap[SessionKey, SessionHolder]()

Review Comment:
   What is the lock this is accessed under? Comment here.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {
+      // .. and remove them.
+      toRemove.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo

Review Comment:
   Should we check if this session had its `lastActivityTime` updated? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {

Review Comment:
   Not required.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -35,37 +38,31 @@ class SparkConnectSessionManager extends Logging {
 
   private val sessionsLock = new Object
 
-  private val sessionStore =
-    CacheBuilder

Review Comment:
   ❤️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1415224401


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {

Review Comment:
   There is no `ignoreExpirationTime`, there is only `ignoreCustomTieout` which is checked above in `if (info.customInactiveTimeoutMs.isDefined && !ignoreCustomTimeout) {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409797975


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -186,7 +196,13 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   def classloader: ClassLoader = artifactManager.classloader
 
   private[connect] def updateAccessTime(): Unit = {
-    lastRpcAccessTime = Some(System.currentTimeMillis())
+    lastAccessTime = System.currentTimeMillis()
+    logInfo(s"Session $key accessed, time $lastAccessTime.")

Review Comment:
   I think it can stay as a logInfo, it happens once per RPC, and we have other logInfo logging at this volume (e.g. when an execution reattaches). This may be useful in default logs for debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409826101


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   > Why is this expiry time rather than custom inactivity time?
   
   Good question. In the previous PR https://github.com/apache/spark/pull/43913 where I was adding an ExtendSession RPC, I found it a less ambiguous interface to pass expiry time rather by extending by X. This way the client and server could know what the deadline is precisely.
   Here I kept it this way.
   What you're saying is that we could also set custom inactivity time, and then could do it only once. I.e. make this override the server side default config.
   Yeah, I guess this could be more consistent with the other config being an inactivity time, and also allow it to be set only once instead of having to keep resetting the expiry time. I'll change it.
   
   > Who sets this in PR?
   
   At this point, as we discussed offline to not introduce the ExtendSession RPC at this point, nobody. But I am leaving it here for use by extensions. If someone wanted to implement custom session expiry rules, they could still do it e.g. using a custom Interceptor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43985:
URL: https://github.com/apache/spark/pull/43985#issuecomment-1852508167

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #43985:
URL: https://github.com/apache/spark/pull/43985#issuecomment-1824895410

   @hvanhovell @grundprinzip @HyukjinKwon @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1410591048


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -186,7 +196,13 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   def classloader: ClassLoader = artifactManager.classloader
 
   private[connect] def updateAccessTime(): Unit = {
-    lastRpcAccessTime = Some(System.currentTimeMillis())
+    lastAccessTime = System.currentTimeMillis()
+    logInfo(s"Session $key accessed, time $lastAccessTime.")

Review Comment:
   When putting logging into Spark Connect execution I was guiding myself with what Thriftserver logs, and there we were putting log info logs as long as it was at most linear to the number of RPC calls. They proved quite useful in debugging many times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1410598467


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {
+      // .. and remove them.
+      toRemove.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo

Review Comment:
   > What is the resolution here?
   
   The updated code checks the last activity once again now, giving the session one last chance to get last moment activity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409826101


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   > Why is this expiry time rather than custom inactivity time?
   
   Good question. In the previous PR https://github.com/apache/spark/pull/43985 where I was adding an ExtendSession RPC, I found it a less ambiguous interface to pass expiry time rather by extending by X. This way the client and server could know what the deadline is precisely.
   Here I kept it this way.
   What you're saying is that we could also set custom inactivity time, and then could do it only once. I.e. make this override the server side default config.
   Yeah, I guess this could be more consistent with the other config being an inactivity time, and also allow it to be set only once instead of having to keep resetting the expiry time. I'll change it.
   
   > Who sets this in PR?
   
   At this point, as we discussed offline to not introduce the ExtendSession RPC at this point, nobody. But I am leaving it here for use by extensions. If someone wanted to implement custom session expiry rules, they could still do it e.g. using a custom Interceptor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1409890157


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.

Review Comment:
   Should we remove closedSessionsCache.put() below?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -186,7 +196,13 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   def classloader: ClassLoader = artifactManager.classloader
 
   private[connect] def updateAccessTime(): Unit = {
-    lastRpcAccessTime = Some(System.currentTimeMillis())
+    lastAccessTime = System.currentTimeMillis()
+    logInfo(s"Session $key accessed, time $lastAccessTime.")

Review Comment:
   That is a lot of logging. The other logging would be indication of this anyway (though it would be better to reduce that too).
   But I am ok if you prefer it.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {
+          // Custom expirationTime set.
+          if (info.customExpirationTime.get <= nowMs) {
+            toRemove += sessionHolder
+          }
+        } else {
+          // Default inactive timeout from last access.
+          if (info.lastAccessTime + defaultInactiveTimeout <= nowMs) {
+            toRemove += sessionHolder
+          }
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {
+      // .. and remove them.
+      toRemove.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo

Review Comment:
   What is the resolution here?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +92,134 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
-    }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
-  }
-
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
         // First put into closedSessionsCache, so that it cannot get accidentally recreated by
         // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
       }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+    }
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach { s =>
+      s.close()
+      // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+      closedSessionsCache.put(s.key, s.getSessionHolderInfo)
     }
   }
 
-  def shutdown(): Unit = {
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
+  }
+
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL).toLong
+        // Default for sessions that don't have expiration time set.
+        val defaultInactiveTimeout =
+          SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT).toLong
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(defaultInactiveTimeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(
+      defaultInactiveTimeout: Long,
+      ignoreExpirationTime: Boolean = false): Unit = {
+    logInfo("Started periodic run of SparkConnectSessionManager maintenance.")
+    // Find any sessions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[SessionHolder]()
     sessionsLock.synchronized {
-      sessionStore.invalidateAll()
-      closedSessionsCache.invalidateAll()
+      val nowMs = System.currentTimeMillis()
+
+      sessionStore.values.foreach { sessionHolder =>
+        val info = sessionHolder.getSessionHolderInfo
+        if (info.customExpirationTime.isDefined && !ignoreExpirationTime) {

Review Comment:
   Sounds great. Better to be consistent for both with inactivity time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #43985:
URL: https://github.com/apache/spark/pull/43985#discussion_r1416371511


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala:
##########
@@ -95,47 +94,159 @@ class SparkConnectSessionManager extends Logging {
     Option(getSession(key, None))
   }
 
-  private def getSession(
-      key: SessionKey,
-      default: Option[Callable[SessionHolder]]): SessionHolder = {
-    val session = default match {
-      case Some(callable) => sessionStore.get(key, callable)
-      case None => sessionStore.getIfPresent(key)
+  private def getSession(key: SessionKey, default: Option[() => SessionHolder]): SessionHolder = {
+    schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started yet.
+
+    sessionsLock.synchronized {
+      // try to get existing session from store
+      val sessionOpt = sessionStore.get(key)
+      // create using default if missing
+      val session = sessionOpt match {
+        case Some(s) => s
+        case None =>
+          default match {
+            case Some(callable) =>
+              val session = callable()
+              sessionStore.put(key, session)
+              session
+            case None =>
+              null
+          }
+      }
+      // record access time before returning
+      session match {
+        case null =>
+          null
+        case s: SessionHolder =>
+          s.updateAccessTime()
+          s
+      }
     }
-    // record access time before returning
-    session match {
-      case null =>
-        null
-      case s: SessionHolder =>
-        s.updateAccessTime()
-        s
+  }
+
+  // Removes session from sessionStore and returns it.
+  private def removeSessionHolder(key: SessionKey): Option[SessionHolder] = {
+    var sessionHolder: Option[SessionHolder] = None
+    sessionsLock.synchronized {
+      sessionHolder = sessionStore.remove(key)
+      sessionHolder.foreach { s =>
+        // Put into closedSessionsCache, so that it cannot get accidentally recreated
+        // by getOrCreateIsolatedSession.
+        closedSessionsCache.put(s.key, s.getSessionHolderInfo)
+      }
     }
+    sessionHolder
+  }
+
+  // Shut downs the session after removing.
+  private def shutdownSessionHolder(sessionHolder: SessionHolder): Unit = {
+    sessionHolder.close()
+    // Update in closedSessionsCache: above it wasn't updated with closedTime etc. yet.
+    closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
   }
 
   def closeSession(key: SessionKey): Unit = {
-    // Invalidate will trigger RemoveSessionListener
-    sessionStore.invalidate(key)
+    val sessionHolder = removeSessionHolder(key)
+    // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
+    // getOrCreateIsolatedSession.
+    sessionHolder.foreach(shutdownSessionHolder(_))
   }
 
-  private class RemoveSessionListener extends RemovalListener[SessionKey, SessionHolder] {
-    override def onRemoval(notification: RemovalNotification[SessionKey, SessionHolder]): Unit = {
-      val sessionHolder = notification.getValue
-      sessionsLock.synchronized {
-        // First put into closedSessionsCache, so that it cannot get accidentally recreated by
-        // getOrCreateIsolatedSession.
-        closedSessionsCache.put(sessionHolder.key, sessionHolder.getSessionHolderInfo)
-      }
-      // Rest of the cleanup outside sessionLock - the session cannot be accessed anymore by
-      // getOrCreateIsolatedSession.
-      sessionHolder.close()
+  private[connect] def shutdown(): Unit = sessionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
     }
+    scheduledExecutor = None
+    // note: this does not cleanly shut down the sessions, but the server is shutting down.
+    sessionStore.clear()
+    closedSessionsCache.invalidateAll()
+  }
+
+  def listActiveSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    sessionStore.values.map(_.getSessionHolderInfo).toSeq
   }
 
-  def shutdown(): Unit = {
+  def listClosedSessions: Seq[SessionHolderInfo] = sessionsLock.synchronized {
+    closedSessionsCache.asMap.asScala.values.toSeq
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled.
+   *
+   * The checks are looking to remove sessions that expired.
+   */
+  private def schedulePeriodicChecks(): Unit = sessionsLock.synchronized {
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL)
+        logInfo(s"Starting thread for cleanup of expired sessions every $interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try {
+              val defaultInactiveTimeoutMs =
+                SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT)
+              periodicMaintenance(defaultInactiveTimeoutMs)
+            } catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  // For testing only: ignoreCustomTimeout lets the check ignore the custom timeout set by

Review Comment:
   nit: Testing use case want to ignore both timeouts and expire immediately. May be more clear to pass `expireNowForTesting`. 
   [Optional]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #43985: [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager
URL: https://github.com/apache/spark/pull/43985


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org