You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/10/14 21:23:30 UTC

spark git commit: [SPARK-10577] [PYSPARK] DataFrame hint for broadcast join

Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f36624983 -> 30eea40ff


[SPARK-10577] [PYSPARK] DataFrame hint for broadcast join

https://issues.apache.org/jira/browse/SPARK-10577

Author: Jian Feng <jz...@gmail.com>

Closes #8801 from Jianfeng-chs/master.

(cherry picked from commit 0180b849dbaf191826231eda7dfaaf146a19602b)
Signed-off-by: Reynold Xin <rx...@databricks.com>

Conflicts:
	python/pyspark/sql/tests.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30eea40f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30eea40f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30eea40f

Branch: refs/heads/branch-1.5
Commit: 30eea40fff97391b8ee3201dd7c6ea7440521386
Parents: f366249
Author: Jian Feng <jz...@gmail.com>
Authored: Mon Sep 21 23:36:41 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Oct 14 12:23:22 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py |  9 +++++++++
 python/pyspark/sql/tests.py     | 27 +++++++++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30eea40f/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 4b74a50..3c631a0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -30,6 +30,7 @@ from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
 from pyspark.sql import since
 from pyspark.sql.types import StringType
 from pyspark.sql.column import Column, _to_java_column, _to_seq
+from pyspark.sql.dataframe import DataFrame
 
 
 def _create_function(name, doc=""):
@@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None):
     return Column(jc)
 
 
+@since(1.6)
+def broadcast(df):
+    """Marks a DataFrame as small enough for use in broadcast joins."""
+
+    sc = SparkContext._active_spark_context
+    return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx)
+
+
 @since(1.4)
 def coalesce(*cols):
     """Returns the first column that is not null.

http://git-wip-us.apache.org/repos/asf/spark/blob/30eea40f/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6b647f3..14414b3 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1056,6 +1056,33 @@ class SQLTests(ReusedPySparkTestCase):
         keys = self.df.withColumn("key", self.df.key).select("key").collect()
         self.assertEqual([r.key for r in keys], list(range(100)))
 
+    # regression test for SPARK-10417
+    def test_column_iterator(self):
+
+        def foo():
+            for x in self.df.key:
+                break
+
+        self.assertRaises(TypeError, foo)
+
+    # add test for SPARK-10577 (test broadcast join hint)
+    def test_functions_broadcast(self):
+        from pyspark.sql.functions import broadcast
+
+        df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+        df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value"))
+
+        # equijoin - should be converted into broadcast join
+        plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan()
+        self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
+
+        # no join key -- should not be a broadcast join
+        plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan()
+        self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
+
+        # planner should not crash without a join
+        broadcast(df1)._jdf.queryExecution().executedPlan()
+
 
 class HiveContextSQLTests(ReusedPySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org