You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/17 00:35:03 UTC

spark git commit: [SPARK-4269][SQL] make wait time configurable in BroadcastHashJoin

Repository: spark
Updated Branches:
  refs/heads/master a66c23e13 -> fa66ef6c9


[SPARK-4269][SQL] make wait time configurable in BroadcastHashJoin

In BroadcastHashJoin, currently it is using a hard coded value (5 minutes) to wait for the execution and broadcast of the small table.
In my opinion, it should be a configurable value since broadcast may exceed 5 minutes in some case, like in a busy/congested network environment.

Author: Jacky Li <ja...@huawei.com>

Closes #3133 from jackylk/timeout-config and squashes the following commits:

733ac08 [Jacky Li] add spark.sql.broadcastTimeout in SQLConf.scala
557acd4 [Jacky Li] switch to sqlContext.getConf
81a5e20 [Jacky Li] make wait time configurable in BroadcastHashJoin


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa66ef6c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa66ef6c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa66ef6c

Branch: refs/heads/master
Commit: fa66ef6c97e87c9255b67b03836a4ba50598ebae
Parents: a66c23e
Author: Jacky Li <ja...@huawei.com>
Authored: Tue Dec 16 15:34:59 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 16 15:34:59 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala    |  7 +++++++
 .../spark/sql/execution/joins/BroadcastHashJoin.scala    | 11 ++++++++++-
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa66ef6c/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9697beb..f5abf71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -38,6 +38,7 @@ private[spark] object SQLConf {
   val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
 
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
+  val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
 
   // Options that control which operators can be chosen by the query planner.  These should be
   // considered hints and may be ignored by future versions of Spark SQL.
@@ -148,6 +149,12 @@ private[sql] trait SQLConf {
   private[spark] def columnNameOfCorruptRecord: String =
     getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
 
+  /**
+   * Timeout in seconds for the broadcast wait time in hash join
+   */
+  private[spark] def broadcastTimeout: Int =
+    getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/fa66ef6c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 5cf2a78..fbe1d06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -42,6 +42,15 @@ case class BroadcastHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
+  val timeout = {
+    val timeoutValue = sqlContext.broadcastTimeout
+    if (timeoutValue < 0) {
+      Duration.Inf
+    } else {
+      timeoutValue.seconds
+    }
+  }
+
   override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
 
   override def requiredChildDistribution =
@@ -56,7 +65,7 @@ case class BroadcastHashJoin(
   }
 
   override def execute() = {
-    val broadcastRelation = Await.result(broadcastFuture, 5.minute)
+    val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>
       hashJoin(streamedIter, broadcastRelation.value)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org