You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "beliefer (via GitHub)" <gi...@apache.org> on 2023/03/04 02:34:46 UTC

[GitHub] [spark] beliefer opened a new pull request, #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

beliefer opened a new pull request, #40275:
URL: https://github.com/apache/spark/pull/40275

   ### What changes were proposed in this pull request?
   Currently, the connect functions missing the broadcast API. This PR want add this API to connect's functions.
   
   
   ### Why are the changes needed?
   Add the broadcast function to connect's functions.scala.
   
   
   ### Does this PR introduce _any_ user-facing change?
    'No'.
   New feature.
   
   
   ### How was this patch tested?
   New test cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125244539


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -1228,6 +1228,22 @@ object functions {
   def map_from_arrays(keys: Column, values: Column): Column =
     Column.fn("map_from_arrays", keys, values)
 
+  /**
+   * Marks a DataFrame as small enough for use in broadcast joins.
+   *
+   * The following example marks the right DataFrame for broadcast hash join using `joinKey`.
+   * {{{
+   *   // left and right are DataFrames
+   *   left.join(broadcast(right), "joinKey")
+   * }}}
+   *
+   * @group normal_funcs
+   * @since 3.4.0
+   */
+  def broadcast[T](df: Dataset[T]): Dataset[T] = {
+    df.hint("broadcast")

Review Comment:
   is this enough? :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125245064


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -495,6 +495,14 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))

Review Comment:
   can you check if we actually broadcast the right table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125652836


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -1228,6 +1228,22 @@ object functions {
   def map_from_arrays(keys: Column, values: Column): Column =
     Column.fn("map_from_arrays", keys, values)
 
+  /**
+   * Marks a DataFrame as small enough for use in broadcast joins.
+   *
+   * The following example marks the right DataFrame for broadcast hash join using `joinKey`.
+   * {{{
+   *   // left and right are DataFrames
+   *   left.join(broadcast(right), "joinKey")
+   * }}}
+   *
+   * @group normal_funcs
+   * @since 3.4.0
+   */
+  def broadcast[T](df: Dataset[T]): Dataset[T] = {
+    df.hint("broadcast")

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #40275:
URL: https://github.com/apache/spark/pull/40275#issuecomment-1455280706

   ping @HyukjinKwon @zhengruifeng @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125591994


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Should we move `SQLHelper` to client module later? `withSQLConf` and other functions are more useful
   
   



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Should we copy `SQLHelper` to client module later? `withSQLConf` and other functions are more useful
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125474623


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")

Review Comment:
   For later: we should have a better way to get the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125651656


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   @LuciferYang It's good idea. Let's do it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #40275:
URL: https://github.com/apache/spark/pull/40275#issuecomment-1455071084

   @LuciferYang I want support the similar `withSQLConf`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions
URL: https://github.com/apache/spark/pull/40275


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #40275:
URL: https://github.com/apache/spark/pull/40275#issuecomment-1455357573

   @hvanhovell @LuciferYang Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125474986


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Prefer to use try.. finally .. when you are resetting confs. That way failing tests do not start influencing others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125652564


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
     val left = spark.range(100).select(col("id"), rand(10).as("a"))
     val right = spark.range(100).select(col("id"), rand(12).as("a"))
     val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
     val joined2 = left
       .join(right, left.colRegex("id") === right.colRegex("id"))
       .select(left("id"), right("a"))
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("broadcast join") {
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    val left = spark.range(100).select(col("id"), rand(10).as("a"))
+    val right = spark.range(100).select(col("id"), rand(12).as("a"))
+    val joined =
+      left.join(broadcast(right), left("id") === right("id")).select(left("id"), right("a"))
+    assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
+    testCapturedStdOut(joined.explain(), "BroadcastHashJoin")

Review Comment:
   Yeah.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #40275:
URL: https://github.com/apache/spark/pull/40275#issuecomment-1455071764

   @LuciferYang Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40275:
URL: https://github.com/apache/spark/pull/40275#issuecomment-1455321694

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org