You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/11 15:35:34 UTC
spark git commit: [SPARK-17415][SQL] Better error message for
driver-side broadcast join OOMs
Repository: spark
Updated Branches:
refs/heads/master 883c76318 -> 767d48076
[SPARK-17415][SQL] Better error message for driver-side broadcast join OOMs
## What changes were proposed in this pull request?
This is a trivial patch that catches all `OutOfMemoryError` while building the broadcast hash relation and rethrows it by wrapping it in a nice error message.
## How was this patch tested?
Existing Tests
Author: Sameer Agarwal <sa...@cs.berkeley.edu>
Closes #14979 from sameeragarwal/broadcast-join-error.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767d4807
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767d4807
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767d4807
Branch: refs/heads/master
Commit: 767d48076971f6f1e2c93ee540a9b2e5e465631b
Parents: 883c763
Author: Sameer Agarwal <sa...@cs.berkeley.edu>
Authored: Sun Sep 11 17:35:27 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Sep 11 17:35:27 2016 +0200
----------------------------------------------------------------------
.../exchange/BroadcastExchangeExec.scala | 73 +++++++++++---------
1 file changed, 42 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/767d4807/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index a809076..7be5d31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -21,6 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -28,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ThreadUtils
/**
@@ -70,38 +72,47 @@ case class BroadcastExchangeExec(
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {
- val beforeCollect = System.nanoTime()
- // Note that we use .executeCollect() because we don't want to convert data to Scala types
- val input: Array[InternalRow] = child.executeCollect()
- if (input.length >= 512000000) {
- throw new SparkException(
- s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+ try {
+ val beforeCollect = System.nanoTime()
+ // Note that we use .executeCollect() because we don't want to convert data to Scala types
+ val input: Array[InternalRow] = child.executeCollect()
+ if (input.length >= 512000000) {
+ throw new SparkException(
+ s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
+ }
+ val beforeBuild = System.nanoTime()
+ longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+ val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+ longMetric("dataSize") += dataSize
+ if (dataSize >= (8L << 30)) {
+ throw new SparkException(
+ s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
+ }
+
+ // Construct and broadcast the relation.
+ val relation = mode.transform(input)
+ val beforeBroadcast = System.nanoTime()
+ longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+ val broadcasted = sparkContext.broadcast(relation)
+ longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+
+ // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
+ // directly without setting an execution id. We should be tolerant to it.
+ if (executionId != null) {
+ sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
+ executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
+ }
+
+ broadcasted
+ } catch {
+ case oe: OutOfMemoryError =>
+ throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " +
+ s"all worker nodes. As a workaround, you can either disable broadcast by setting " +
+ s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark driver " +
+ s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
+ .initCause(oe.getCause)
}
- val beforeBuild = System.nanoTime()
- longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
- val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
- longMetric("dataSize") += dataSize
- if (dataSize >= (8L << 30)) {
- throw new SparkException(
- s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
- }
-
- // Construct and broadcast the relation.
- val relation = mode.transform(input)
- val beforeBroadcast = System.nanoTime()
- longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
-
- val broadcasted = sparkContext.broadcast(relation)
- longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
-
- // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
- // directly without setting an execution id. We should be tolerant to it.
- if (executionId != null) {
- sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
- executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
- }
-
- broadcasted
}
}(BroadcastExchangeExec.executionContext)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org