You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2023/04/15 21:37:42 UTC

[GitHub] [spark] ueshin opened a new pull request, #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

ueshin opened a new pull request, #40806:
URL: https://github.com/apache/spark/pull/40806

   ### What changes were proposed in this pull request?
   
   Skips Spark execution when the dataframe is local.
   
   ### Why are the changes needed?
   
   When the built DataFrame in Spark Connect is local, we can skip Spark execution.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40806:
URL: https://github.com/apache/spark/pull/40806#discussion_r1168748163


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -161,100 +161,104 @@ object SparkConnectStreamHandler {
     // Conservatively sets it 70% because the size is not accurate but estimated.
     val maxBatchSize = (SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong
 
-    SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
-      val rows = dataframe.queryExecution.executedPlan.execute()
-      val numPartitions = rows.getNumPartitions
-      var numSent = 0
-
-      if (numPartitions > 0) {
-        type Batch = (Array[Byte], Long)
-
-        val batches = rows.mapPartitionsInternal(
-          SparkConnectStreamHandler
-            .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))
-
-        val signal = new Object
-        val partitions = new Array[Array[Batch]](numPartitions)
-        var error: Option[Throwable] = None
-
-        // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
+    val rowToArrowConverter = SparkConnectStreamHandler
+      .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId)
 
-        val future = spark.sparkContext.submitJob(
-          rdd = batches,
-          processPartition = (iter: Iterator[Batch]) => iter.toArray,
-          partitions = Seq.range(0, numPartitions),
-          resultHandler = resultHandler,
-          resultFunc = () => ())
-
-        // Collect errors and propagate them to the main thread.
-        future.onComplete { result =>
-          result.failed.foreach { throwable =>
-            signal.synchronized {
-              error = Some(throwable)
-              signal.notify()
-            }
-          }
-        }(ThreadUtils.sameThread)
-
-        // The main thread will wait until 0-th partition is available,
-        // then send it to client and wait for the next partition.
-        // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
-        // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
-        // tasks not related to scheduling. This is particularly important if there are
-        // multiple users or clients running code at the same time.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            var part = partitions(currentPartitionId)
-            while (part == null && error.isEmpty) {
-              signal.wait()
-              part = partitions(currentPartitionId)
-            }
-            partitions(currentPartitionId) = null
+    var numSent = 0
+    def sendBatch(bytes: Array[Byte], count: Long): Unit = {
+      val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
+      val batch = proto.ExecutePlanResponse.ArrowBatch
+        .newBuilder()
+        .setRowCount(count)
+        .setData(ByteString.copyFrom(bytes))
+        .build()
+      response.setArrowBatch(batch)
+      responseObserver.onNext(response.build())
+      numSent += 1
+    }
 
-            error.foreach { case other =>
-              throw other
+    dataframe.queryExecution.executedPlan match {
+      case LocalTableScanExec(_, rows) =>
+        rowToArrowConverter(rows.iterator).foreach { case (bytes, count) =>
+          sendBatch(bytes, count)
+        }
+      case _ =>
+        SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
+          val rows = dataframe.queryExecution.executedPlan.execute()
+          val numPartitions = rows.getNumPartitions
+
+          if (numPartitions > 0) {
+            type Batch = (Array[Byte], Long)
+
+            val batches = rows.mapPartitionsInternal(rowToArrowConverter)
+
+            val signal = new Object
+            val partitions = new Array[Array[Batch]](numPartitions)
+            var error: Option[Throwable] = None
+
+            // This callback is executed by the DAGScheduler thread.
+            // After fetching a partition, it inserts the partition into the Map, and then
+            // wakes up the main thread.
+            val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+              signal.synchronized {
+                partitions(partitionId) = partition
+                signal.notify()
+              }
+              ()
             }
-            part
-          }
 
-          partition.foreach { case (bytes, count) =>
-            val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
-            val batch = proto.ExecutePlanResponse.ArrowBatch
-              .newBuilder()
-              .setRowCount(count)
-              .setData(ByteString.copyFrom(bytes))
-              .build()
-            response.setArrowBatch(batch)
-            responseObserver.onNext(response.build())
-            numSent += 1
+            val future = spark.sparkContext.submitJob(
+              rdd = batches,
+              processPartition = (iter: Iterator[Batch]) => iter.toArray,
+              partitions = Seq.range(0, numPartitions),
+              resultHandler = resultHandler,
+              resultFunc = () => ())
+
+            // Collect errors and propagate them to the main thread.
+            future.onComplete { result =>
+              result.failed.foreach { throwable =>
+                signal.synchronized {
+                  error = Some(throwable)
+                  signal.notify()
+                }
+              }
+            }(ThreadUtils.sameThread)
+
+            // The main thread will wait until 0-th partition is available,
+            // then send it to client and wait for the next partition.
+            // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
+            // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
+            // tasks not related to scheduling. This is particularly important if there are
+            // multiple users or clients running code at the same time.
+            var currentPartitionId = 0
+            while (currentPartitionId < numPartitions) {
+              val partition = signal.synchronized {
+                var part = partitions(currentPartitionId)
+                while (part == null && error.isEmpty) {
+                  signal.wait()
+                  part = partitions(currentPartitionId)
+                }
+                partitions(currentPartitionId) = null
+
+                error.foreach { case other =>

Review Comment:
   Unnecessary partial function, but this looks like the previous code
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #40806:
URL: https://github.com/apache/spark/pull/40806#discussion_r1169054143


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -161,100 +161,104 @@ object SparkConnectStreamHandler {
     // Conservatively sets it 70% because the size is not accurate but estimated.
     val maxBatchSize = (SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong
 
-    SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
-      val rows = dataframe.queryExecution.executedPlan.execute()
-      val numPartitions = rows.getNumPartitions
-      var numSent = 0
-
-      if (numPartitions > 0) {
-        type Batch = (Array[Byte], Long)
-
-        val batches = rows.mapPartitionsInternal(
-          SparkConnectStreamHandler
-            .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))
-
-        val signal = new Object
-        val partitions = new Array[Array[Batch]](numPartitions)
-        var error: Option[Throwable] = None
-
-        // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
+    val rowToArrowConverter = SparkConnectStreamHandler
+      .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId)
 
-        val future = spark.sparkContext.submitJob(
-          rdd = batches,
-          processPartition = (iter: Iterator[Batch]) => iter.toArray,
-          partitions = Seq.range(0, numPartitions),
-          resultHandler = resultHandler,
-          resultFunc = () => ())
-
-        // Collect errors and propagate them to the main thread.
-        future.onComplete { result =>
-          result.failed.foreach { throwable =>
-            signal.synchronized {
-              error = Some(throwable)
-              signal.notify()
-            }
-          }
-        }(ThreadUtils.sameThread)
-
-        // The main thread will wait until 0-th partition is available,
-        // then send it to client and wait for the next partition.
-        // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
-        // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
-        // tasks not related to scheduling. This is particularly important if there are
-        // multiple users or clients running code at the same time.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            var part = partitions(currentPartitionId)
-            while (part == null && error.isEmpty) {
-              signal.wait()
-              part = partitions(currentPartitionId)
-            }
-            partitions(currentPartitionId) = null
+    var numSent = 0
+    def sendBatch(bytes: Array[Byte], count: Long): Unit = {
+      val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
+      val batch = proto.ExecutePlanResponse.ArrowBatch
+        .newBuilder()
+        .setRowCount(count)
+        .setData(ByteString.copyFrom(bytes))
+        .build()
+      response.setArrowBatch(batch)
+      responseObserver.onNext(response.build())
+      numSent += 1
+    }
 
-            error.foreach { case other =>
-              throw other
+    dataframe.queryExecution.executedPlan match {
+      case LocalTableScanExec(_, rows) =>
+        rowToArrowConverter(rows.iterator).foreach { case (bytes, count) =>
+          sendBatch(bytes, count)
+        }
+      case _ =>
+        SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
+          val rows = dataframe.queryExecution.executedPlan.execute()
+          val numPartitions = rows.getNumPartitions
+
+          if (numPartitions > 0) {
+            type Batch = (Array[Byte], Long)
+
+            val batches = rows.mapPartitionsInternal(rowToArrowConverter)
+
+            val signal = new Object
+            val partitions = new Array[Array[Batch]](numPartitions)
+            var error: Option[Throwable] = None
+
+            // This callback is executed by the DAGScheduler thread.
+            // After fetching a partition, it inserts the partition into the Map, and then
+            // wakes up the main thread.
+            val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+              signal.synchronized {
+                partitions(partitionId) = partition
+                signal.notify()
+              }
+              ()
             }
-            part
-          }
 
-          partition.foreach { case (bytes, count) =>
-            val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
-            val batch = proto.ExecutePlanResponse.ArrowBatch
-              .newBuilder()
-              .setRowCount(count)
-              .setData(ByteString.copyFrom(bytes))
-              .build()
-            response.setArrowBatch(batch)
-            responseObserver.onNext(response.build())
-            numSent += 1
+            val future = spark.sparkContext.submitJob(
+              rdd = batches,
+              processPartition = (iter: Iterator[Batch]) => iter.toArray,
+              partitions = Seq.range(0, numPartitions),
+              resultHandler = resultHandler,
+              resultFunc = () => ())
+
+            // Collect errors and propagate them to the main thread.
+            future.onComplete { result =>
+              result.failed.foreach { throwable =>
+                signal.synchronized {
+                  error = Some(throwable)
+                  signal.notify()
+                }
+              }
+            }(ThreadUtils.sameThread)
+
+            // The main thread will wait until 0-th partition is available,
+            // then send it to client and wait for the next partition.
+            // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
+            // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
+            // tasks not related to scheduling. This is particularly important if there are
+            // multiple users or clients running code at the same time.
+            var currentPartitionId = 0
+            while (currentPartitionId < numPartitions) {
+              val partition = signal.synchronized {
+                var part = partitions(currentPartitionId)
+                while (part == null && error.isEmpty) {
+                  signal.wait()
+                  part = partitions(currentPartitionId)
+                }
+                partitions(currentPartitionId) = null
+
+                error.foreach { case other =>

Review Comment:
   Let me fix it, too, while we're here. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40806:
URL: https://github.com/apache/spark/pull/40806#discussion_r1168748163


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -161,100 +161,104 @@ object SparkConnectStreamHandler {
     // Conservatively sets it 70% because the size is not accurate but estimated.
     val maxBatchSize = (SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong
 
-    SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
-      val rows = dataframe.queryExecution.executedPlan.execute()
-      val numPartitions = rows.getNumPartitions
-      var numSent = 0
-
-      if (numPartitions > 0) {
-        type Batch = (Array[Byte], Long)
-
-        val batches = rows.mapPartitionsInternal(
-          SparkConnectStreamHandler
-            .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))
-
-        val signal = new Object
-        val partitions = new Array[Array[Batch]](numPartitions)
-        var error: Option[Throwable] = None
-
-        // This callback is executed by the DAGScheduler thread.
-        // After fetching a partition, it inserts the partition into the Map, and then
-        // wakes up the main thread.
-        val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
-          signal.synchronized {
-            partitions(partitionId) = partition
-            signal.notify()
-          }
-          ()
-        }
+    val rowToArrowConverter = SparkConnectStreamHandler
+      .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId)
 
-        val future = spark.sparkContext.submitJob(
-          rdd = batches,
-          processPartition = (iter: Iterator[Batch]) => iter.toArray,
-          partitions = Seq.range(0, numPartitions),
-          resultHandler = resultHandler,
-          resultFunc = () => ())
-
-        // Collect errors and propagate them to the main thread.
-        future.onComplete { result =>
-          result.failed.foreach { throwable =>
-            signal.synchronized {
-              error = Some(throwable)
-              signal.notify()
-            }
-          }
-        }(ThreadUtils.sameThread)
-
-        // The main thread will wait until 0-th partition is available,
-        // then send it to client and wait for the next partition.
-        // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
-        // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
-        // tasks not related to scheduling. This is particularly important if there are
-        // multiple users or clients running code at the same time.
-        var currentPartitionId = 0
-        while (currentPartitionId < numPartitions) {
-          val partition = signal.synchronized {
-            var part = partitions(currentPartitionId)
-            while (part == null && error.isEmpty) {
-              signal.wait()
-              part = partitions(currentPartitionId)
-            }
-            partitions(currentPartitionId) = null
+    var numSent = 0
+    def sendBatch(bytes: Array[Byte], count: Long): Unit = {
+      val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
+      val batch = proto.ExecutePlanResponse.ArrowBatch
+        .newBuilder()
+        .setRowCount(count)
+        .setData(ByteString.copyFrom(bytes))
+        .build()
+      response.setArrowBatch(batch)
+      responseObserver.onNext(response.build())
+      numSent += 1
+    }
 
-            error.foreach { case other =>
-              throw other
+    dataframe.queryExecution.executedPlan match {
+      case LocalTableScanExec(_, rows) =>
+        rowToArrowConverter(rows.iterator).foreach { case (bytes, count) =>
+          sendBatch(bytes, count)
+        }
+      case _ =>
+        SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
+          val rows = dataframe.queryExecution.executedPlan.execute()
+          val numPartitions = rows.getNumPartitions
+
+          if (numPartitions > 0) {
+            type Batch = (Array[Byte], Long)
+
+            val batches = rows.mapPartitionsInternal(rowToArrowConverter)
+
+            val signal = new Object
+            val partitions = new Array[Array[Batch]](numPartitions)
+            var error: Option[Throwable] = None
+
+            // This callback is executed by the DAGScheduler thread.
+            // After fetching a partition, it inserts the partition into the Map, and then
+            // wakes up the main thread.
+            val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+              signal.synchronized {
+                partitions(partitionId) = partition
+                signal.notify()
+              }
+              ()
             }
-            part
-          }
 
-          partition.foreach { case (bytes, count) =>
-            val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
-            val batch = proto.ExecutePlanResponse.ArrowBatch
-              .newBuilder()
-              .setRowCount(count)
-              .setData(ByteString.copyFrom(bytes))
-              .build()
-            response.setArrowBatch(batch)
-            responseObserver.onNext(response.build())
-            numSent += 1
+            val future = spark.sparkContext.submitJob(
+              rdd = batches,
+              processPartition = (iter: Iterator[Batch]) => iter.toArray,
+              partitions = Seq.range(0, numPartitions),
+              resultHandler = resultHandler,
+              resultFunc = () => ())
+
+            // Collect errors and propagate them to the main thread.
+            future.onComplete { result =>
+              result.failed.foreach { throwable =>
+                signal.synchronized {
+                  error = Some(throwable)
+                  signal.notify()
+                }
+              }
+            }(ThreadUtils.sameThread)
+
+            // The main thread will wait until 0-th partition is available,
+            // then send it to client and wait for the next partition.
+            // Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
+            // the arrow batches in main thread to avoid DAGScheduler thread been blocked for
+            // tasks not related to scheduling. This is particularly important if there are
+            // multiple users or clients running code at the same time.
+            var currentPartitionId = 0
+            while (currentPartitionId < numPartitions) {
+              val partition = signal.synchronized {
+                var part = partitions(currentPartitionId)
+                while (part == null && error.isEmpty) {
+                  signal.wait()
+                  part = partitions(currentPartitionId)
+                }
+                partitions(currentPartitionId) = null
+
+                error.foreach { case other =>

Review Comment:
   Unnecessary partial function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local
URL: https://github.com/apache/spark/pull/40806


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40806: [SPARK-43153][CONNECT] Skip Spark execution when the dataframe is local

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40806:
URL: https://github.com/apache/spark/pull/40806#issuecomment-1512593794

   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org