You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/11 04:22:50 UTC

[GitHub] [spark] HyukjinKwon opened a new pull request, #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

HyukjinKwon opened a new pull request, #38613:
URL: https://github.com/apache/spark/pull/38613

   ### What changes were proposed in this pull request?
   
   This PR is a followup of https://github.com/apache/spark/pull/38468 that proposes to remove notify-wait approach, and introduce a new way to collect partitions in parallel, and send them in order.
   
   - Previously, it actually waits until all results are stored all first, and then send them one by one in Protobuf message; (therefore, notify-wait isn't needed in fact).
   
       Both worse and best cases, we will always collect all partitions first and send them partition by partition.
   
   - Now, it sends Protobuf messages in an order whenever 0th partition is available (and send the next if available).
   
       Worse case, we will collect all partitions and send them one by one. Best case is to send partition by partition as it's collected.
   
   
   ### Why are the changes needed?
   
   For better performance, less memory usage, and better readability and maintinability (by removing synchronization) 
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, this feature is not released yet, and this is performance only fix.
   
   ### How was this patch tested?
   
   CI in this PR should test it out.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019825048


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -56,7 +56,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     try {
       processAsArrowBatches(request.getClientId, dataframe)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>

Review Comment:
   maybe `NonFatal`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020130165


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   Sorry I missed your comment when I opened this PR. BTW, this is actually what PySpark's implementation doing, and was thinking that it's better to match how they work, dedup, and improve together. It should work fine most cases - PySpark implementation has been running in production many years, and I haven't yet heard complaints related to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019826331


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -56,7 +56,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     try {
       processAsArrowBatches(request.getClientId, dataframe)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>

Review Comment:
   Hm, it's a fallback so should better work in any case .. (?). I think we should remove this fallback anyway. I can revert this change for now too. (see https://github.com/apache/spark/pull/38468#discussion_r1013555606



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019831985


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {

Review Comment:
   Nope, it doesn't (because it's guided by the index). This approach is actually from the initial ordered implementation of collect with Arrow (that were in production for very long time), https://github.com/apache/spark/commit/82c18c240a6913a917df3b55cc5e22649561c4dd#diff-459628811d7786c705fbb2b7a381ecd2b88f183f44ab607d43b3d33ea48d390fR3282-R3318.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020126889


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+          // If result is from next partition in order
+          if (partitionId - 1 == lastIndex) {
+            writeBatches(partition)
+            lastIndex += 1
+            // Write stored partitions that come next in order
+            while (lastIndex < results.length && results(lastIndex) != null) {
+              writeBatches(results(lastIndex))
+              results(lastIndex) = null
+              lastIndex += 1
+            }
+          } else {
+            // Store partitions received out of order
+            results(partitionId - 1) = partition
+          }
         }
+        spark.sparkContext.runJob(batches, (iter: Iterator[Batch]) => iter.toArray, resultHandler)

Review Comment:
   Why use a thread pool if you have thread sitting around?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020127788


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -56,7 +56,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     try {
       processAsArrowBatches(request.getClientId, dataframe)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>

Review Comment:
   Yeah, let me revert this for now. We should remove this JSON fallback anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020124708


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   The reason why I suggested to use locks and the main thread to write the results is exactly what this comment is trying to convey. You don't want these operations to happen inside the DAGScheduler thread. If you keep that blocked for something none scheduling related, you will stop all other scheduling. This is particularly bad in an environment where you might have multiple users running code at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020127951


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -56,7 +56,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     try {
       processAsArrowBatches(request.getClientId, dataframe)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>

Review Comment:
   ```suggestion
         case e: Exception =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019830407


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {

Review Comment:
   do it need to be synchronized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020133986


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   That is not an argument against doing this properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38613:
URL: https://github.com/apache/spark/pull/38613#issuecomment-1311621739

   Actually let's just go with https://github.com/apache/spark/pull/38614 approach which is simpler. This approach can't easily dedup the codes anyway because of ordering anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1021041882


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   ok. let me add a comment for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38613:
URL: https://github.com/apache/spark/pull/38613#issuecomment-1311232973

   It collects all results first because of synced `runJob` that waits all results to arrive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020130165


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   Sorry I missed your comment when I opened this PR. BTW, this is actually what PySpark's implementation doing, and was thinking that it's better to match how they work, dedup, and improve together. It should work fine most cases - PySpark implementation has been running in production many years, and I haven't yet heard complaints related to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019833127


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+          // If result is from next partition in order
+          if (partitionId - 1 == lastIndex) {
+            writeBatches(partition)
+            lastIndex += 1
+            // Write stored partitions that come next in order
+            while (lastIndex < results.length && results(lastIndex) != null) {
+              writeBatches(results(lastIndex))
+              results(lastIndex) = null
+              lastIndex += 1
+            }
+          } else {
+            // Store partitions received out of order
+            results(partitionId - 1) = partition
+          }
         }
+        spark.sparkContext.runJob(batches, (iter: Iterator[Batch]) => iter.toArray, resultHandler)

Review Comment:
   Hm, I just noticed the review comment. I believe this is matched with our current implementation in PySpark. If we should improve, let's improve both paths together. I would prefer to match them and deduplicate the logic first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38613:
URL: https://github.com/apache/spark/pull/38613#issuecomment-1311232475

   > Previously, it actually waits until all results are stored all first
   
   Really? I think the best case is also sending partitions one by one.
   
   Anyway, this PR looks good as it avoids the lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1021041413


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   good point! We should write it down as code comments. @zhengruifeng can you help with it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020162907


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   We have seen in higher concurrency scenarios that this does become a problem. Throughput will plateau because the DAGScheduler is doing the wrong things. I would like to avoid that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019829685


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+          // If result is from next partition in order
+          if (partitionId - 1 == lastIndex) {
+            writeBatches(partition)
+            lastIndex += 1
+            // Write stored partitions that come next in order
+            while (lastIndex < results.length && results(lastIndex) != null) {
+              writeBatches(results(lastIndex))
+              results(lastIndex) = null
+              lastIndex += 1
+            }
+          } else {
+            // Store partitions received out of order
+            results(partitionId - 1) = partition
+          }
         }
+        spark.sparkContext.runJob(batches, (iter: Iterator[Batch]) => iter.toArray, resultHandler)

Review Comment:
   https://github.com/apache/spark/pull/38468#discussion_r1014279677
   
   maybe we can create a threadpool? (shared across `collect` invocations)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019833916


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+          // If result is from next partition in order
+          if (partitionId - 1 == lastIndex) {
+            writeBatches(partition)
+            lastIndex += 1
+            // Write stored partitions that come next in order
+            while (lastIndex < results.length && results(lastIndex) != null) {
+              writeBatches(results(lastIndex))
+              results(lastIndex) = null
+              lastIndex += 1
+            }
+          } else {
+            // Store partitions received out of order
+            results(partitionId - 1) = partition
+          }
         }
+        spark.sparkContext.runJob(batches, (iter: Iterator[Batch]) => iter.toArray, resultHandler)

Review Comment:
   +1 for match the implementations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1019831985


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -184,9 +158,30 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             responseObserver.onNext(response.build())
             numSent += 1
           }
+        }
+
+        // Store collection results for worst case of 1 to N-1 partitions
+        val results = new Array[Array[Batch]](numPartitions - 1)
+        var lastIndex = -1 // index of last partition written
 
-          currentPartitionId += 1
+        // Handler to eagerly write partitions in order
+        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {

Review Comment:
   Nope, it doesn't (because it's guided by the index). This approach is actually from the initial ordered implementation of collect with Arrow (that has been running in production for very long time), https://github.com/apache/spark/commit/82c18c240a6913a917df3b55cc5e22649561c4dd#diff-459628811d7786c705fbb2b7a381ecd2b88f183f44ab607d43b3d33ea48d390fR3282-R3318.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020126325


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -56,7 +56,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     try {
       processAsArrowBatches(request.getClientId, dataframe)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>

Review Comment:
   You are catching OutOfMemory and other virtual machine errors... You won't be able to recover from that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect
URL: https://github.com/apache/spark/pull/38613


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38613: [SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38613:
URL: https://github.com/apache/spark/pull/38613#discussion_r1020133986


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -144,36 +144,10 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
             .toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
         }
 
-        val signal = new Object
-        val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
-
-        val processPartition = (iter: Iterator[Batch]) => iter.toArray
-
         // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
-
-        spark.sparkContext.runJob(batches, processPartition, resultHandler)
-
-        // The man thread will wait until 0-th partition is available,
-        // then send it to client and wait for next partition.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            while (!partitions.contains(currentPartitionId)) {
-              signal.wait()
-            }
-            partitions.remove(currentPartitionId).get
-          }
-
-          partition.foreach { case (bytes, count) =>
+        def writeBatches(arrowBatches: Array[Batch]): Unit = {

Review Comment:
   That is not an argument against doing this properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org