You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/11 12:18:22 UTC

[spark] branch master updated: [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f614b3f699 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob`
4f614b3f699 is described below

commit 4f614b3f699d4d3924d4411c98a20d2e58b2e2e6
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Fri Nov 11 21:18:09 2022 +0900

    [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob`
    
    ### What changes were proposed in this pull request?
    use `submitJob` instead of `runJob`
    
    ### Why are the changes needed?
    `spark.sparkContext.runJob` is blocked until finishes all partitions
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing Tests
    
    Closes #38614 from zhengruifeng/connect_collect_submitJob.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/connect/service/SparkConnectStreamHandler.scala   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index ffac330cd6d..55e091bd8d0 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -161,17 +161,24 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
           ()
         }
 
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
+        spark.sparkContext.submitJob(
+          rdd = batches,
+          processPartition = processPartition,
+          partitions = Seq.range(0, numPartitions),
+          resultHandler = resultHandler,
+          resultFunc = () => ())
 
         // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
+        // then send it to client and wait for the next partition.
         var currentPartitionId = 0
         while (currentPartitionId < numPartitions) {
           val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
+            var result = partitions.remove(currentPartitionId)
+            while (result.isEmpty) {
               signal.wait()
+              result = partitions.remove(currentPartitionId)
             }
-            partitions.remove(currentPartitionId).get
+            result.get
           }
 
           partition.foreach { case (bytes, count) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org