You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/04/25 06:12:06 UTC

[GitHub] [spark] rangadi opened a new pull request, #40937: [SPARK-42940] Improve session management for streaming queries

rangadi opened a new pull request, #40937:
URL: https://github.com/apache/spark/pull/40937

   ### What changes were proposed in this pull request?
   This fixes couple of important issues related to session management for streaming queries.
   
   1. Session mapping should be maintained at connect server as long as the streaming query is active, even if there are no accesses from the client side. Currently the session mapping is dropped after 1 hour of inactivity. 
   2. When streaming query is stopped, the Spark session drops its reference to the streaming query object. That implies it can not accessed by remote spark-connect client. It is common usage pattern for users to access a streaming query after it is is stopped (e.g. to check its metrics, any exception if failed, etc). 
      - This is not a problem in legacy mode since the user code in the REPL keeps the reference. This is no longer the case in Spark-Connect. 
   
   *Solution*: This PR adds `SparkConnectStreamingQueryCache` that does not the following:
     * Each new streaming query is registered with this cache.
     * It runs a periodic task that checks the status of these queries and polls session mapping in connect-server so that the session stays alive.
     * When query is stopped, it cached for 1 hour more so the it can be accessed from remote client. 
   
   ### Why are the changes needed?
     - Explained in the above description.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   - Unit tests
   - Manual testing
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #40937:
URL: https://github.com/apache/spark/pull/40937#issuecomment-1527975880

   @amaliujia, could you also take a quick look? I would like to get this merged today. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries
URL: https://github.com/apache/spark/pull/40937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40937:
URL: https://github.com/apache/spark/pull/40937#issuecomment-1521805579

   > Solution: This PR adds SparkConnectStreamingQueryCache that does not the following:
   
   does not the following?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common I believe, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the user could see the error as it's handled by existing error-handling framework in SparkConnectService. 
   
   But `StreamingQueryManager` unregisters the query in that case 
   
   https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L411
   
   When the customer try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common I believe, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   
   My guess is that the user could see the error as it's handled by existing error-handling framework in SparkConnectService. 
   
   But `StreamingQueryManager` unregisters the query in that case 
   
   https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L411
   
   When the customer try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1180650813


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   Update the PR with two changes:
     - Include `runId` in the cache. As a result, each run of a query will be maintained independently (unit test is updated).
     - After a query is stopped, it is cached for 1 hour. But if the client keeps accessing the query, will keep extending this timeout. I.e. 1 hour is an inactivity timeout. 
   
   @pengzhon-db PTAL. @HeartSaVioR will be out on Monday too due to public holiday in SK. We can get this merged and address any remaining comments as a follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178553564


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   I will update the PR to cache all the instances of a query (ie. key is `query_id, run_id`).  This will handle various scenarios better.
   
   One reason for not starting with this in first pass was to limit queries cached in case of crash loop of query. We can handle that (not in this PR for now). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1176905596


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.

Review Comment:
   How do we decide 1 hour? Will this be configurable ?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.
+    }
+  }
+
+  /**
+   * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures
+   * the the session associated with it matches the session passed into the call.
+   */
+  def findCachedQuery(queryId: String, session: SparkSession): Option[StreamingQuery] = {
+    queryCacheLock.synchronized {
+      cacheQueryValue(queryId).flatMap { v =>
+        if (v.session == session) Some(v.query)
+        else None // This should be rare. Likely the query is restarted on a different session.

Review Comment:
   Just trying to understand how this can happen. If it's restarted on a different session, a new value will replace existing one with same queryId, then it can still be found from the cache, right? Or are u referring to possible race condition here?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))

Review Comment:
   Just for my understanding, so the session will be kept alive for another 1 hour by getting value from it?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))
+    // getIfPresent() prevents accidental loading.

Review Comment:
   what does this comment mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177096641


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.

Review Comment:
   Just chose reasonably long value. Also matches how long Connect server also keeps session mappings. 
   We will see if this needs to be configurable. This is good enough for now. It will be revisited when we look session management across connect. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178548438


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   We discussed in offline but summarizing here for other reviewers / future reference:
   
   There is inconsistency about selecting winner. In cache mechanism, later one always wins the race and overwrites the old one. In concurrent query execution, we never know which one wins.
   
   For example, say there are three users userA userB and userC. They somehow (wrongly) refer the same checkpoint and run the query at the same time. (Three query with different run IDs but same unique query ID would exist at the same time.) While there will be eventually only one winner, individual should be able to track their own query status, despite that they are determined as loser.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common I believe, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the user could see the error as it's handled by existing error-handling framework. But when they try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #40937:
URL: https://github.com/apache/spark/pull/40937#issuecomment-1521207815

   cc: @WweiL, @pengzhon-db, @LuciferYang, @grundprinzip, @amaliujia, @juliuszsompolski, @HyukjinKwon 
   
   (Fairly broad CC since it adds caching at the service level and is relevant for future life cycle management for queries)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1176871003


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent(userId -> sessionId)

Review Comment:
   Yep. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177145822


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   Ah I see then the query won't be returned so it's fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1176846545


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.

Review Comment:
   Added full semantics to Scaladoc. Let's move the discussion about them here. 
   
   @juliuszsompolski the [points you raised](https://github.com/apache/spark/pull/40937#pullrequestreview-1399458578) important. PTAL above Scaladoc. Short answer: session will live as long as the query is active. Same as current Spark. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178099999


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.

Review Comment:
   Yes. I recently started working on designing more session management in general. We need to track all active running queries as well, to be able to e.g. interrupt/cancel it, or to re-attach to them in case client loses grpc stream connection, and also to avoid removing a session that still has queries that are running (which currently can happen if a client is just waiting in a ExecutePlan that is running for > 1 hour; session will get evicted based on 1 hour idleness...).
   As result of this, this will likely be folded into the more generic session manager.
   
   I'd remove the Future improvements comment from here for now, as the direction may in the end depend on the proposed design in general. That should come soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1180735277


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178402235


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   These are reentrant locks. This is from Java, continues to Scala. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177092773


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))

Review Comment:
   Yes. Cache eviction is based on last access. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))
+    // getIfPresent() prevents accidental loading.

Review Comment:
   `userSessionMapping` is a loading cache, i.e. if we do a `get()` and the entry does not exist, it creates new entry by creating a new spark session for that ID. We don't want to do that if (for any reason) the mapping for the session we want to keep alive is already removed. `getIfPresent()` avoids creating an entry.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.
+    }
+  }
+
+  /**
+   * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures
+   * the the session associated with it matches the session passed into the call.
+   */
+  def findCachedQuery(queryId: String, session: SparkSession): Option[StreamingQuery] = {
+    queryCacheLock.synchronized {
+      cacheQueryValue(queryId).flatMap { v =>
+        if (v.session == session) Some(v.query)
+        else None // This should be rare. Likely the query is restarted on a different session.

Review Comment:
   It can be found, but only under the new session. The new query belongs to the new session. It should not be accessible from the old session. 
   So if we two separate clients (say two terminals):
     - Client 1 (session 1) does : `query_1 = start_query_X`, then `query_1.stop()`.
     - Client 2 (session 2)  does: `query_2 = start_query_X`; 
     - Client 1 does : `query_1.status()`
        -  This would fail with _"query not found"_ error, since  this already been replaced by query_2 above. 
     



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.

Review Comment:
   Just chose reasonably long value. This is how long Connect server also keeps session mappings. 
   We will see if this needs to be configurable. This is good enough for now. It will be revisited when we look session management across connect. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178043662


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.

Review Comment:
   Met with Julek about these semantics and larger life cycle. We agreed these make sense and will allow improvements to overall session management. 
   E.g. we likely need to track batch queries too similar to streaming queries. We will likely fold this into new session manager. In addition, such improvements will not affect client API or protocol. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40937:
URL: https://github.com/apache/spark/pull/40937#issuecomment-1528550625

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common I believe, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the user could see the error as it's handled by existing error-handling framework in SparkConnectService. 
   But `StreamingQueryManager` unregisters the query in that case https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L411
   
   When the customer try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common actually, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the `orElse` would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178398552


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   Can do that, but is simpler to just keep the lock. If someone is reading code for this method, it is clear to them that it is locked, without having to look at the call site. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178403254


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))
+    // getIfPresent() prevents accidental loading.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178534860


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   Good point. We only keep track of most recent run intentionally (also mentioned in the semantics). We can keep track of all the runs, but that might increase number of queries tracked (especially if there is a query in crashloop or trigger.once() loop). 
   There is nothing technically blocking us to track at runId level.
   
   So in the above scenario, which run ends up seen last will survive. The rest of them will not be tracked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178534860


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   Good point. We only keep track of most recent run intentionally (also mentioned in the semantics). We can keep track of all the runs, but that might increase number of queries tracked (especially if there is a query in crashloop or trigger.once() loop). 
   There is nothing technically blocking us to track at runId level.
   
   So in the above scenario, the run that ends up seen last will survive. The rest of them will not be tracked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1180734989


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status. If the client
+ *     continues to access the query, it stays in the cache until 1 hour of inactivity.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryInactivityTimeout: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString), value) match {
+        case Some(existing) => // Query is being replace. Not really expected.
+          log.warn(
+            s"Replacing existing query in the cache (unexpected). Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.
+    }
+  }
+
+  /**
+   * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures
+   * the the session associated with it matches the session passed into the call. If the query is
+   * inactive (i.e. it has a cache expiry time set), this access extends its expiry time. So
+   * if a client keeps accessing a query, it stays in the cache.
+   */
+  def getCachedQuery(queryId: String, runId: String, session: SparkSession):
+  Option[StreamingQuery] = {
+    val key = QueryCacheKey(queryId, runId)
+    queryCacheLock.synchronized {
+      queryCache.get(key).flatMap { v =>
+        if (v.session == session) {
+          v.expiresAtMs.foreach { _ =>
+            // Extend the expiry time as the client is accessing it.
+            val expiresAtMs = clock.getTimeMillis() + stoppedQueryInactivityTimeout.toMillis
+            queryCache.put(key, v.copy(expiresAtMs = Some(expiresAtMs)))
+          }
+          Some(v.query)
+        }
+        else None // This should be rare, may be client is trying access from a different session.
+      }
+    }
+  }
+
+  // Visible for testing
+  private[service] def cacheQueryValue(queryId: String, runId: String): Option[QueryCacheValue] =

Review Comment:
   nit: this function name should be `getCacheQueryValue` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line. What would happen? I would think the `orElse` would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177137568


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   This method also seems to call `queryCacheLock.synchronized {`? Maybe we could remove that line in `schedulePeriodicChecks` and make sure that it w?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1176222614


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent(userId -> sessionId)

Review Comment:
   nit: maybe use (userId, sessionId), since they are meant here as a tuple, not a mapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178043662


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.

Review Comment:
   Met with Julek about these semantics and larger session/query life cycle management. We agreed these make sense and will allow improvements to overall session management. 
   E.g. we likely need to track batch queries too similar to streaming queries. We will likely fold this into new session manager. In addition, such improvements will not affect client API or protocol. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40937:
URL: https://github.com/apache/spark/pull/40937#issuecomment-1523777137

   cc @HeartSaVioR too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178400574


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   Oh I thought doing a lock() inside a lock() would block...? That's not the case in scala...?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1176049156


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1733,6 +1735,7 @@ class SparkConnectPlanner(val session: SparkSession) {
 
   def process(
       command: proto.Command,
+      userId: String,

Review Comment:
   UserId is used in `handleWriteStreamOperationStar()` (which starts a streaming query).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common I believe, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the user could see the error as it's handled by existing error-handling framework in SparkConnectService. But when they try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pengzhon-db commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "pengzhon-db (via GitHub)" <gi...@apache.org>.
pengzhon-db commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178398721


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -268,6 +268,12 @@ object SparkConnectService {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
+  private[connect] val streamingSessionManager =
+    new SparkConnectStreamingQueryCache(sessionKeepAliveFn = { case (userId, sessionId) =>
+      userSessionMapping.getIfPresent((userId, sessionId))
+    // getIfPresent() prevents accidental loading.

Review Comment:
   nit: I would put this comment line before the line 273



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178398941


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   Yep, query id is not sent to the client, so no need to track. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178532487


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   The possible problematic case is that multiple remote sessions are trying to run the streaming query with same checkpoint. Pretty sure it's not something Spark supports and one of the query will eventually survive (others will fail eventually, preferably within a single batch execution), but at least driver is able to run multiple streaming queries with the same checkpoint concurrently. Please make sure this won't break the key of the cache. Using runId would avoid the problem, but I guess there would be a reason to use query ID instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178495387


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:

Review Comment:
   This continuation of "outdated" comment thread about semantics : https://github.com/apache/spark/pull/40937#discussion_r1176846545
   
   @HeartSaVioR could you take a look at this and may be bit of code too? This will help more streaming team review. cc: @HyukjinKwon .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in creation, which is common actually, for example a wrong config is set). What would happen? 
   My guess is that the `orElse` would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common actually, for example a wrong config is set). What would happen? 
   My guess is that the `orElse` would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177124740


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2092,6 +2097,11 @@ class SparkConnectPlanner(val session: SparkSession) {
       case path => writer.start(path)
     }
 
+    // Register the new query so that the session and query references are cached.
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(

Review Comment:
   If the query throws before this line (basically in query creation, which is common actually, for example wrong config is set, then IllegalArgumentException would be thrown [example](https://github.com/apache/spark/blob/b26844ce879ac0097d6e1a95da18a5c3ef3c9284/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L227-L268)). What would happen? 
   My guess is that the customer could see the error as it's handled by existing error-handling framework. But when they try to access the query, the `orElse` below would be triggered:
   ```
   val query = SparkConnectService.streamingSessionManager
         .findCachedQuery(id, session) // Common case: query is cached in connect session manager.
         .orElse { // Else try to find it in active streams. Mostly will not be found here.
           Option(session.streams.get(id))
   ```
   And that'd still return a query ID not found exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177137568


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   This method also seems to call `queryCacheLock.synchronized {`? Maybe we could remove that line in `schedulePeriodicChecks`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #40937: [SPARK-42940] Improve session management for streaming queries

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1177137568


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString), value) match {
+        case Some(existing) => // Query is being replaced. Can happen when a query is restarted.
+          log.info(
+            s"Replacing existing query in the cache. Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.

Review Comment:
   This method also seems to call `queryCacheLock.synchronized {`? Maybe we could remove that line in `schedulePeriodicChecks` and make sure that it always runs within the lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178495387


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:

Review Comment:
   This is a continuation of "outdated" comment thread about semantics : https://github.com/apache/spark/pull/40937#discussion_r1176846545
   
   @HeartSaVioR could you take a look at this and may be bit of code too? This will help more streaming team review. cc: @HyukjinKwon .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178548438


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   We discussed in offline but summarizing here for other reviewers / future reference:
   
   There is inconsistency between selecting winner. In cache mechanism, later one always wins the race and overwrites the old one. In concurrent query execution, we never know which one wins.
   
   For example, say there are three users userA userB and userC. They somehow (wrongly) refer the same checkpoint and run the query at the same time. (Three query with different run IDs but same unique query ID would exist at the same time.) While there will be eventually only one winner, individual should be able to track their own query status, despite that they are determined as loser.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178532487


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryCachePeriod: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {

Review Comment:
   The possible problematic case is that multiple remote sessions are trying to run the streaming query with same checkpoint. Pretty sure it's not something Spark supports and one of the query will eventually survive (others will fail), but at least driver is able to run multiple streaming queries with the same checkpoint concurrently. Please make sure this won't break the key of the cache. Using runId would avoid the problem, but I guess there would be a reason to use query ID instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1178183159


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status.
+ *     - During this time if the query is restarted (i.e. has a new run id), the reference to
+ *       previous run is dropped. As a result logical query has only the most recent query
+ *       reference cached. This policy can be revisited to cache multiple runs for a query.
+ *   - Note that these semantics are evolving and might change before being finalized in Connect.
+ *   - Future improvements:
+ *     - Provide an API for a users to access session even after they lose session id.
+ *       - Once a user is properly authenticated, the API could return list of sessions that are
+ *         still alive in the connect server for that user.

Review Comment:
   Agreed. Removed 'Future improvements' section. 
   I will update here if we can access a streaming query across different sessions/notebooks in classic spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40937: [SPARK-42940][SS][CONNECT] Improve session management for streaming queries

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40937:
URL: https://github.com/apache/spark/pull/40937#discussion_r1180745917


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status. If the client
+ *     continues to access the query, it stays in the cache until 1 hour of inactivity.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryInactivityTimeout: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString), value) match {
+        case Some(existing) => // Query is being replace. Not really expected.
+          log.warn(
+            s"Replacing existing query in the cache (unexpected). Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.
+    }
+  }
+
+  /**
+   * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures
+   * the the session associated with it matches the session passed into the call. If the query is
+   * inactive (i.e. it has a cache expiry time set), this access extends its expiry time. So
+   * if a client keeps accessing a query, it stays in the cache.
+   */
+  def getCachedQuery(queryId: String, runId: String, session: SparkSession):
+  Option[StreamingQuery] = {
+    val key = QueryCacheKey(queryId, runId)
+    queryCacheLock.synchronized {
+      queryCache.get(key).flatMap { v =>
+        if (v.session == session) {
+          v.expiresAtMs.foreach { _ =>
+            // Extend the expiry time as the client is accessing it.
+            val expiresAtMs = clock.getTimeMillis() + stoppedQueryInactivityTimeout.toMillis
+            queryCache.put(key, v.copy(expiresAtMs = Some(expiresAtMs)))
+          }
+          Some(v.query)
+        }
+        else None // This should be rare, may be client is trying access from a different session.
+      }
+    }
+  }
+
+  // Visible for testing
+  private[service] def cacheQueryValue(queryId: String, runId: String): Option[QueryCacheValue] =

Review Comment:
   Rename it to `getCachedValue()`. This is only accessed in tests.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
+
+/**
+ * Caches Spark-Connect streaming query references and the sessions. When a query is stopped (i.e.
+ * no longer active), it is cached for 1 hour so that it is accessible from the client side. It
+ * runs a background thread to run a periodic task that does the following:
+ *   - Check the status of the queries, and drops those that expired (1 hour after being stopped).
+ *   - Keep the associated session active by invoking supplied function `sessionKeepAliveFn`.
+ *
+ * This class helps with supporting following semantics for streaming query sessions:
+ *   - Keep the session and session mapping at connect server alive as long as a streaming query
+ *     is active. Even if the client side has disconnected.
+ *     - This matches how streaming queries behave in Spark. The queries continue to run if
+ *       notebook or job session is lost.
+ *   - Once a query is stopped, the reference and mappings are maintained for 1 hour and will be
+ *     accessible from the client. This allows time for client to fetch status. If the client
+ *     continues to access the query, it stays in the cache until 1 hour of inactivity.
+ *
+ * Note that these semantics are evolving and might change before being finalized in Connect.
+ */
+private[connect] class SparkConnectStreamingQueryCache(
+    val sessionKeepAliveFn: (String, String) => Unit, // (userId, sessionId) => Unit.
+    val clock: Clock = new SystemClock(),
+    private val stoppedQueryInactivityTimeout: Duration = 1.hour, // Configurable for testing.
+    private val sessionPollingPeriod: Duration = 1.minute // Configurable for testing.
+) extends Logging {
+
+  import SparkConnectStreamingQueryCache._
+
+  def registerNewStreamingQuery(sessionHolder: SessionHolder, query: StreamingQuery): Unit = {
+    queryCacheLock.synchronized {
+      val value = QueryCacheValue(
+        userId = sessionHolder.userId,
+        sessionId = sessionHolder.sessionId,
+        session = sessionHolder.session,
+        query = query,
+        expiresAtMs = None)
+
+      queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString), value) match {
+        case Some(existing) => // Query is being replace. Not really expected.
+          log.warn(
+            s"Replacing existing query in the cache (unexpected). Query Id: ${query.id}." +
+              s"Existing value $existing, new value $value.")
+        case None =>
+          log.info(s"Adding new query to the cache. Query Id ${query.id}, value $value.")
+      }
+
+      schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started.
+    }
+  }
+
+  /**
+   * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures
+   * the the session associated with it matches the session passed into the call. If the query is
+   * inactive (i.e. it has a cache expiry time set), this access extends its expiry time. So
+   * if a client keeps accessing a query, it stays in the cache.
+   */
+  def getCachedQuery(queryId: String, runId: String, session: SparkSession):
+  Option[StreamingQuery] = {
+    val key = QueryCacheKey(queryId, runId)
+    queryCacheLock.synchronized {
+      queryCache.get(key).flatMap { v =>
+        if (v.session == session) {
+          v.expiresAtMs.foreach { _ =>
+            // Extend the expiry time as the client is accessing it.
+            val expiresAtMs = clock.getTimeMillis() + stoppedQueryInactivityTimeout.toMillis
+            queryCache.put(key, v.copy(expiresAtMs = Some(expiresAtMs)))
+          }
+          Some(v.query)
+        }
+        else None // This should be rare, may be client is trying access from a different session.
+      }
+    }
+  }
+
+  // Visible for testing
+  private[service] def cacheQueryValue(queryId: String, runId: String): Option[QueryCacheValue] =

Review Comment:
   Renamed it to `getCachedValue()`. This is only accessed in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org