You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/11 15:35:34 UTC

spark git commit: [SPARK-17415][SQL] Better error message for driver-side broadcast join OOMs

Repository: spark
Updated Branches:
  refs/heads/master 883c76318 -> 767d48076


[SPARK-17415][SQL] Better error message for driver-side broadcast join OOMs

## What changes were proposed in this pull request?

This is a trivial patch that catches all `OutOfMemoryError` while building the broadcast hash relation and rethrows it by wrapping it in a nice error message.

## How was this patch tested?

Existing Tests

Author: Sameer Agarwal <sa...@cs.berkeley.edu>

Closes #14979 from sameeragarwal/broadcast-join-error.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767d4807
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767d4807
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767d4807

Branch: refs/heads/master
Commit: 767d48076971f6f1e2c93ee540a9b2e5e465631b
Parents: 883c763
Author: Sameer Agarwal <sa...@cs.berkeley.edu>
Authored: Sun Sep 11 17:35:27 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Sep 11 17:35:27 2016 +0200

----------------------------------------------------------------------
 .../exchange/BroadcastExchangeExec.scala        | 73 +++++++++++---------
 1 file changed, 42 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/767d4807/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index a809076..7be5d31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -21,6 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 
 import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -28,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -70,38 +72,47 @@ case class BroadcastExchangeExec(
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
       SQLExecution.withExecutionId(sparkContext, executionId) {
-        val beforeCollect = System.nanoTime()
-        // Note that we use .executeCollect() because we don't want to convert data to Scala types
-        val input: Array[InternalRow] = child.executeCollect()
-        if (input.length >= 512000000) {
-          throw new SparkException(
-            s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+        try {
+          val beforeCollect = System.nanoTime()
+          // Note that we use .executeCollect() because we don't want to convert data to Scala types
+          val input: Array[InternalRow] = child.executeCollect()
+          if (input.length >= 512000000) {
+            throw new SparkException(
+              s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+          }
+          val beforeBuild = System.nanoTime()
+          longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+          val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+          longMetric("dataSize") += dataSize
+          if (dataSize >= (8L << 30)) {
+            throw new SparkException(
+              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
+          }
+
+          // Construct and broadcast the relation.
+          val relation = mode.transform(input)
+          val beforeBroadcast = System.nanoTime()
+          longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+          val broadcasted = sparkContext.broadcast(relation)
+          longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+
+          // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
+          // directly without setting an execution id. We should be tolerant to it.
+          if (executionId != null) {
+            sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
+              executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
+          }
+
+          broadcasted
+        } catch {
+          case oe: OutOfMemoryError =>
+            throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " +
+              s"all worker nodes. As a workaround, you can either disable broadcast by setting " +
+              s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark driver " +
+              s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
+              .initCause(oe.getCause)
         }
-        val beforeBuild = System.nanoTime()
-        longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
-        val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
-        longMetric("dataSize") += dataSize
-        if (dataSize >= (8L << 30)) {
-          throw new SparkException(
-            s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
-        }
-
-        // Construct and broadcast the relation.
-        val relation = mode.transform(input)
-        val beforeBroadcast = System.nanoTime()
-        longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
-
-        val broadcasted = sparkContext.broadcast(relation)
-        longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
-
-        // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
-        // directly without setting an execution id. We should be tolerant to it.
-        if (executionId != null) {
-          sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
-            executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
-        }
-
-        broadcasted
       }
     }(BroadcastExchangeExec.executionContext)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org