You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/15 03:05:13 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #24595: [SPARK-20774][SPARK-27036][CORE] Cancel the running broadcast execution on BroadcastTimeout

cloud-fan commented on a change in pull request #24595: [SPARK-20774][SPARK-27036][CORE] Cancel the running broadcast execution on BroadcastTimeout
URL: https://github.com/apache/spark/pull/24595#discussion_r284072112
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ##########
 @@ -67,68 +70,74 @@ case class BroadcastExchangeExec(
 
   @transient
   private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
-    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
+    // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    Future {
-      // This will run in another thread. Set the execution id so that we can connect these jobs
-      // with the correct execution.
-      SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
-        try {
-          val beforeCollect = System.nanoTime()
-          // Use executeCollect/executeCollectIterator to avoid conversion to Scala types
-          val (numRows, input) = child.executeCollectIterator()
-          if (numRows >= 512000000) {
-            throw new SparkException(
-              s"Cannot broadcast the table with 512 million or more rows: $numRows rows")
-          }
-
-          val beforeBuild = System.nanoTime()
-          longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - beforeCollect)
-
-          // Construct the relation.
-          val relation = mode.transform(input, Some(numRows))
-
-          val dataSize = relation match {
-            case map: HashedRelation =>
-              map.estimatedSize
-            case arr: Array[InternalRow] =>
-              arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
-            case _ =>
-              throw new SparkException("[BUG] BroadcastMode.transform returned unexpected type: " +
-                  relation.getClass.getName)
+    val task = new Callable[broadcast.Broadcast[Any]]() {
+      override def call(): broadcast.Broadcast[Any] = {
+        // This will run in another thread. Set the execution id so that we can connect these jobs
+        // with the correct execution.
+        SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+          try {
+            sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)",
 
 Review comment:
   let's add a comment to explain why we set up a job group here. There is no other public API that can cancel a specific job AFAIK.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org