You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/08/05 01:15:52 UTC

[GitHub] [spark] juliuszsompolski opened a new pull request, #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

juliuszsompolski opened a new pull request, #42355:
URL: https://github.com/apache/spark/pull/42355

   ### What changes were proposed in this pull request?
   
   WIP
   If handling is done directly on the GRPC thread, flow control OnReady events are getting queued until after the handler returns, so OnReadyHandler never gets notified until after the handler exits...
   When handler just sets everything up, and launches a thread to handle  ExecuteGrpcResponseSender.run, the handler is finished, and further events can arrive.
   
   ### Why are the changes needed?
   
   WIP
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Before:
   ```
   23/08/05 00:41:41 ERROR ExecuteGrpcResponseSender: Wait for grpcCallObserver to become ready with timeout=17317 ms.
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$sendResponse$2(ExecuteGrpcResponseSender.scala:238)
   org.apache.spark.internal.Logging.logError(Logging.scala:76)
   org.apache.spark.internal.Logging.logError$(Logging.scala:75)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.logError(ExecuteGrpcResponseSender.scala:38)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.sendResponse(ExecuteGrpcResponseSender.scala:237)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.run(ExecuteGrpcResponseSender.scala:173)
   org.apache.spark.sql.connect.service.ExecuteHolder.attachAndRunGrpcResponseSender(ExecuteHolder.scala:121)
   org.apache.spark.sql.connect.service.SparkConnectExecutePlanHandler.handle(SparkConnectExecutePlanHandler.scala:39)
   org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:73)
   org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:743)
   org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860)
   org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   ...
   23/08/05 00:41:58 ERROR ExecuteGrpcResponseSender: Deadline reached, finishing stream after index 3.
   23/08/05 00:41:58 ERROR ExecuteGrpcResponseSender: ON READY
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$run$2(ExecuteGrpcResponseSender.scala:90)
   org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onReady(ServerCalls.java:206)
   org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:188)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860)
   org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   ```
   
   After:
   ```
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: Sent response index=1007.
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: Wait for grpcCallObserver to become ready with timeout=16988 ms.
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$sendResponse$2(ExecuteGrpcResponseSender.scala:258)
   org.apache.spark.internal.Logging.logError(Logging.scala:76)
   org.apache.spark.internal.Logging.logError$(Logging.scala:75)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.logError(ExecuteGrpcResponseSender.scala:38)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.sendResponse(ExecuteGrpcResponseSender.scala:257)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.execute(ExecuteGrpcResponseSender.scala:193)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender$$anon$1.run(ExecuteGrpcResponseSender.scala:83)
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: ON READY
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$run$1(ExecuteGrpcResponseSender.scala:71)
   org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onReady(ServerCalls.java:206)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.onReady(ServerCallImpl.java:385)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1OnReady.runInContext(ServerImpl.java:933)
   org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: Sent response index=1008.
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: Wait for grpcCallObserver to become ready with timeout=16988 ms.
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$sendResponse$2(ExecuteGrpcResponseSender.scala:258)
   org.apache.spark.internal.Logging.logError(Logging.scala:76)
   org.apache.spark.internal.Logging.logError$(Logging.scala:75)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.logError(ExecuteGrpcResponseSender.scala:38)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.sendResponse(ExecuteGrpcResponseSender.scala:257)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.execute(ExecuteGrpcResponseSender.scala:193)
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender$$anon$1.run(ExecuteGrpcResponseSender.scala:83)
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: ON READY
   org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender.$anonfun$run$1(ExecuteGrpcResponseSender.scala:71)
   org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onReady(ServerCalls.java:206)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.onReady(ServerCallImpl.java:385)
   org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1OnReady.runInContext(ServerImpl.java:933)
   org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   23/08/05 01:05:14 ERROR ExecuteGrpcResponseSender: Sent response index=1009.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254250


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -58,6 +61,39 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
     detached = true
   }
 
+  def run(lastConsumedStreamIndex: Long): Unit = {
+
+    // In reattachable execution, we check if grpcCallObserver is ready for sending.
+    // See sendResponse
+    if (executeHolder.reattachable && flowControl) {
+      val grpcCallObserver = grpcObserver.asInstanceOf[ServerCallStreamObserver[T]]
+      grpcCallObserver.setOnReadyHandler(() => {
+        val e = new Exception()
+        logError(s"ON READY\n${e.getStackTrace.mkString("\n")}")
+        grpcCallObserverReadySignal.synchronized {
+          grpcCallObserverReadySignal.notifyAll()
+        }
+      })
+    }
+
+    // We run in a separate daemon thread
+    val t = new Thread {
+      override def run(): Unit = {
+        try {
+          execute(lastConsumedStreamIndex)
+        } finally {
+          if (!executeHolder.reattachable) {
+            // Non reattachable executions release here immediately.
+            // (Reattachable executions close release with ReleaseExecute RPC.)
+            executeHolder.close()
+          }
+        }
+      }
+    }
+    t.setDaemon(true) // can I not set daemon and just don't join?

Review Comment:
   If you do that and the application tries to terminate it won't... You don't have to join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254552


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -47,6 +48,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
   // Signal to wake up when grpcCallObserver.isReady()
   private val grpcCallObserverReadySignal = new Object
 
+  val flowControl = true

Review Comment:
   yes... remove, or do you intend to do something with it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254420


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -58,6 +61,39 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
     detached = true
   }
 
+  def run(lastConsumedStreamIndex: Long): Unit = {
+
+    // In reattachable execution, we check if grpcCallObserver is ready for sending.
+    // See sendResponse
+    if (executeHolder.reattachable && flowControl) {
+      val grpcCallObserver = grpcObserver.asInstanceOf[ServerCallStreamObserver[T]]
+      grpcCallObserver.setOnReadyHandler(() => {
+        val e = new Exception()
+        logError(s"ON READY\n${e.getStackTrace.mkString("\n")}")
+        grpcCallObserverReadySignal.synchronized {
+          grpcCallObserverReadySignal.notifyAll()
+        }
+      })
+    }
+
+    // We run in a separate daemon thread
+    val t = new Thread {

Review Comment:
   We really need to document the - absolutely bonkers - reason why this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42355:
URL: https://github.com/apache/spark/pull/42355#issuecomment-1666952669

   @juliuszsompolski couple of the fun threading questions:
   1. Is the `ExecuteGrpcResponseSender` thread safe? For example is `detach()` safe? 
   2. Is it possible for events to produced in a weird order?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1286441954


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -85,12 +85,18 @@ private[connect] class ExecuteResponseObserver[T <: MessageLite](val executeHold
    */
   private var responseSender: Option[ExecuteGrpcResponseSender[T]] = None
 
+  // Statistics about cached responses.
+  private var cachedSizeUntilHighestConsumed = CachedSize()
+  private var cachedSizeUntilLastProduced = CachedSize()
+  private var autoRemovedSize = CachedSize()
+  private var totalSize = CachedSize()
+
   /**
    * Total size of response to be held buffered after giving out with getResponse. 0 for none, any
    * value greater than 0 will buffer the response from getResponse.
    */
   private val retryBufferSize = if (executeHolder.reattachable) {
-    SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong
+    SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong

Review Comment:
   this was a bug of using wrong config...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254302


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -58,6 +61,39 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
     detached = true
   }
 
+  def run(lastConsumedStreamIndex: Long): Unit = {
+
+    // In reattachable execution, we check if grpcCallObserver is ready for sending.
+    // See sendResponse
+    if (executeHolder.reattachable && flowControl) {
+      val grpcCallObserver = grpcObserver.asInstanceOf[ServerCallStreamObserver[T]]
+      grpcCallObserver.setOnReadyHandler(() => {
+        val e = new Exception()
+        logError(s"ON READY\n${e.getStackTrace.mkString("\n")}")
+        grpcCallObserverReadySignal.synchronized {
+          grpcCallObserverReadySignal.notifyAll()
+        }
+      })
+    }
+
+    // We run in a separate daemon thread
+    val t = new Thread {

Review Comment:
   Please give the thread a name that is understandable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #42355:
URL: https://github.com/apache/spark/pull/42355#issuecomment-1668673016

   @hvanhovell 
   > Is the ExecuteGrpcResponseSender thread safe? For example is detach() safe?
   
   detach() is synchronized on the executeObserver it's attached to, like the other places in the sender... Modified the code a bit to make it more clear.
   
   > Is it possible for events to produced in a weird order?
   
   No. Execution pushes responses to executeObserver in order, there is a bunch of assertions that assure order of events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285702609


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -34,7 +35,7 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
  * @param grpcObserver
  *   the GRPC request StreamObserver
  */
-private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
+private[connect] class ExecuteGrpcResponseSender[T <: Message](

Review Comment:
   `ProtoUtils.abbreviate` wants a `Message`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254488


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -82,7 +82,7 @@ object Connect {
         "Set to 0 for unlimited.")
       .version("3.5.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("5m")
+      .createWithDefaultString("20s")

Review Comment:
   Debug code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control
URL: https://github.com/apache/spark/pull/42355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285695252


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -34,7 +35,7 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
  * @param grpcObserver
  *   the GRPC request StreamObserver
  */
-private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
+private[connect] class ExecuteGrpcResponseSender[T <: Message](

Review Comment:
   What's the reason for this particular change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254383


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -166,7 +192,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
         // There is a response available to be sent.
         val sent = sendResponse(response.get.response, deadlineTimeMillis)
         if (sent) {
-          logDebug(s"Sent response index=$nextIndex.")
+          logError(s"Sent response index=$nextIndex.")

Review Comment:
   Debug code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #42355:
URL: https://github.com/apache/spark/pull/42355#issuecomment-1668675937

   @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254340


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala:
##########
@@ -58,6 +61,39 @@ private[connect] class ExecuteGrpcResponseSender[T <: MessageLite](
     detached = true
   }
 
+  def run(lastConsumedStreamIndex: Long): Unit = {
+
+    // In reattachable execution, we check if grpcCallObserver is ready for sending.
+    // See sendResponse
+    if (executeHolder.reattachable && flowControl) {
+      val grpcCallObserver = grpcObserver.asInstanceOf[ServerCallStreamObserver[T]]
+      grpcCallObserver.setOnReadyHandler(() => {
+        val e = new Exception()
+        logError(s"ON READY\n${e.getStackTrace.mkString("\n")}")

Review Comment:
   Debug code so remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42355: [WIP][CONNECT] Return from executePlan / reattachExecute handler and process stream on different thread

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1285254599


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala:
##########
@@ -47,7 +47,7 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
  * @see
  *   attachConsumer
  */
-private[connect] class ExecuteResponseObserver[T <: MessageLite](val executeHolder: ExecuteHolder)
+private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: ExecuteHolder)

Review Comment:
   Why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42355:
URL: https://github.com/apache/spark/pull/42355#discussion_r1286442702


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -82,7 +93,7 @@ object Connect {
         "Set to 0 for unlimited.")
       .version("3.5.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("5m")
+      .createWithDefaultString("2m")

Review Comment:
   @hvanhovell I think 2 minutes of stream time is a better default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42355: [SPARK-44709][CONNECT] Run ExecuteGrpcResponseSender in reattachable execute in new thread to fix flow control

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42355:
URL: https://github.com/apache/spark/pull/42355#issuecomment-1669948605

   Merging to master/3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org