You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/15 01:08:50 UTC
[spark] branch master updated: [SPARK-41005][CONNECT][DOC][FOLLOW-UP] Document the reason of sending batch in main thread
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 84fabc27d68 [SPARK-41005][CONNECT][DOC][FOLLOW-UP] Document the reason of sending batch in main thread
84fabc27d68 is described below
commit 84fabc27d688601feabb42abfc7356cd743b3c38
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Tue Nov 15 10:08:35 2022 +0900
[SPARK-41005][CONNECT][DOC][FOLLOW-UP] Document the reason of sending batch in main thread
### What changes were proposed in this pull request?
Document the reason of sending batch in main thread
### Why are the changes needed?
as per https://github.com/apache/spark/pull/38613#discussion_r1021041413
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
no, doc-only
Closes #38654 from zhengruifeng/connect_doc_collect.
Lead-authored-by: Ruifeng Zheng <ru...@apache.org>
Co-authored-by: Ruifeng Zheng <ru...@foxmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/sql/connect/service/SparkConnectStreamHandler.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 55e091bd8d0..ec2db3efa96 100644
--- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -168,8 +168,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
resultHandler = resultHandler,
resultFunc = () => ())
- // The man thread will wait until 0-th partition is available,
+ // The main thread will wait until 0-th partition is available,
// then send it to client and wait for the next partition.
+ // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
+ // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
+ // tasks not related to scheduling. This is particularly important if there are
+ // multiple users or clients running code at the same time.
var currentPartitionId = 0
while (currentPartitionId < numPartitions) {
val partition = signal.synchronized {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org