You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/06 02:11:05 UTC

[spark] branch master updated: [SPARK-42557][CONNECT] Add Broadcast to functions

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 94b50e0977f [SPARK-42557][CONNECT] Add Broadcast to functions
94b50e0977f is described below

commit 94b50e0977fe10d14c890f08b811e2a4021638a7
Author: Jiaan Geng <be...@163.com>
AuthorDate: Sun Mar 5 22:10:51 2023 -0400

    [SPARK-42557][CONNECT] Add Broadcast to functions
    
    ### What changes were proposed in this pull request?
    Currently, the connect functions missing the broadcast API. This PR want add this API to connect's functions.
    
    ### Why are the changes needed?
    Add the broadcast function to connect's functions.scala.
    
    ### Does this PR introduce _any_ user-facing change?
     'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #40275 from beliefer/SPARK-42557.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../scala/org/apache/spark/sql/functions.scala     |  16 ++++++++++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  16 +++++++++-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   4 +++
 .../explain-results/test_broadcast.explain         |   5 ++++
 .../query-tests/queries/test_broadcast.json        |  33 +++++++++++++++++++++
 .../query-tests/queries/test_broadcast.proto.bin   | Bin 0 -> 122 bytes
 6 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 386219a699c..76a27686bfd 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1228,6 +1228,22 @@ object functions {
   def map_from_arrays(keys: Column, values: Column): Column =
     Column.fn("map_from_arrays", keys, values)
 
+  /**
+   * Marks a DataFrame as small enough for use in broadcast joins.
+   *
+   * The following example marks the right DataFrame for broadcast hash join using `joinKey`.
+   * {{{
+   *   // left and right are DataFrames
+   *   left.join(broadcast(right), "joinKey")
+   * }}}
+   *
+   * @group normal_funcs
+   * @since 3.4.0
+   */
+  def broadcast[T](df: Dataset[T]): Dataset[T] = {
+    df.hint("broadcast")
+  }
+
   /**
    * Returns the first column that is not null, or null if all inputs are null.
    *
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 79902e769c6..089645a2d8d 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -28,7 +28,7 @@ import org.scalactic.TolerantNumerics
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
-import org.apache.spark.sql.functions.{aggregate, array, col, count, lit, rand, sequence, shuffle, struct, transform, udf}
+import org.apache.spark.sql.functions.{aggregate, array, broadcast, col, count, lit, rand, sequence, shuffle, struct, transform, udf}
 import org.apache.spark.sql.types._
 
 class ClientE2ETestSuite extends RemoteSparkSession {
@@ -500,6 +500,20 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    try {
+      val left = spark.range(100).select(col("id"), rand(10).as("a"))
+      val right = spark.range(100).select(col("id"), rand(12).as("a"))
+      val joined =
+        left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+      assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+      testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    } finally {
+      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+    }
+  }
+
   test("test temp view") {
     try {
       spark.range(100).createTempView("test1")
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index e8921ca776d..3b69b02df5c 100755
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -1978,6 +1978,10 @@ class PlanGenerationTestSuite
     simple.groupBy(Column("id")).pivot("a").agg(functions.count(Column("b")))
   }
 
+  test("test broadcast") {
+    left.join(fn.broadcast(right), "id")
+  }
+
   test("function lit") {
     simple.select(
       fn.lit(fn.col("id")),
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/test_broadcast.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/test_broadcast.explain
new file mode 100644
index 00000000000..8c86098c265
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/test_broadcast.explain
@@ -0,0 +1,5 @@
+'Project [id#0L, a#0, b#0, a#0, payload#0]
++- 'Join Inner, (id#0L = id#0L)
+   :- LocalRelation <empty>, [id#0L, a#0, b#0]
+   +- ResolvedHint (strategy=broadcast)
+      +- LocalRelation <empty>, [a#0, id#0L, payload#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.json b/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.json
new file mode 100644
index 00000000000..54094286425
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "3"
+  },
+  "join": {
+    "left": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "common": {
+        "planId": "2"
+      },
+      "hint": {
+        "input": {
+          "common": {
+            "planId": "1"
+          },
+          "localRelation": {
+            "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+          }
+        },
+        "name": "broadcast"
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER",
+    "usingColumns": ["id"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.proto.bin
new file mode 100644
index 00000000000..96c87594c69
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/test_broadcast.proto.bin differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org