You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/09/22 08:36:47 UTC
spark git commit: [SPARK-10577] [PYSPARK] DataFrame hint for
broadcast join
Repository: spark
Updated Branches:
refs/heads/master bf20d6c9f -> 0180b849d
[SPARK-10577] [PYSPARK] DataFrame hint for broadcast join
https://issues.apache.org/jira/browse/SPARK-10577
Author: Jian Feng <jz...@gmail.com>
Closes #8801 from Jianfeng-chs/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0180b849
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0180b849
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0180b849
Branch: refs/heads/master
Commit: 0180b849dbaf191826231eda7dfaaf146a19602b
Parents: bf20d6c
Author: Jian Feng <jz...@gmail.com>
Authored: Mon Sep 21 23:36:41 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Sep 21 23:36:41 2015 -0700
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 9 +++++++++
python/pyspark/sql/tests.py | 18 ++++++++++++++++++
2 files changed, 27 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0180b849/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 26b8662..fa04f4c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -29,6 +29,7 @@ from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType
from pyspark.sql.column import Column, _to_java_column, _to_seq
+from pyspark.sql.dataframe import DataFrame
def _create_function(name, doc=""):
@@ -189,6 +190,14 @@ def approxCountDistinct(col, rsd=None):
return Column(jc)
+@since(1.6)
+def broadcast(df):
+ """Marks a DataFrame as small enough for use in broadcast joins."""
+
+ sc = SparkContext._active_spark_context
+ return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)
+
+
@since(1.4)
def coalesce(*cols):
"""Returns the first column that is not null.
http://git-wip-us.apache.org/repos/asf/spark/blob/0180b849/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3e680f1..645133b 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1075,6 +1075,24 @@ class SQLTests(ReusedPySparkTestCase):
self.assertRaises(TypeError, foo)
+ # add test for SPARK-10577 (test broadcast join hint)
+ def test_functions_broadcast(self):
+ from pyspark.sql.functions import broadcast
+
+ df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+ df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+
+ # equijoin - should be converted into broadcast join
+ plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
+ self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
+
+ # no join key -- should not be a broadcast join
+ plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
+ self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
+
+ # planner should not crash without a join
+ broadcast(df1)._jdf.queryExecution().executedPlan()
+
class HiveContextSQLTests(ReusedPySparkTestCase):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org