You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sunchao (via GitHub)" <gi...@apache.org> on 2023/09/07 17:08:57 UTC

[GitHub] [spark] sunchao commented on a diff in pull request #42306: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys

sunchao commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1305837139


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1510,6 +1510,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")

Review Comment:
   Maybe we should also mention that this also requires `spark.sql.requireAllClusterKeysForDistribution` to be `false` as well, even though by default it is already false.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -674,7 +711,8 @@ case class HashShuffleSpec(
 
 case class KeyGroupedShuffleSpec(
     partitioning: KeyGroupedPartitioning,
-    distribution: ClusteredDistribution) extends ShuffleSpec {
+    distribution: ClusteredDistribution,
+    joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {

Review Comment:
   we can add some comments for `KeyGroupedShuffleSpec` to explain what is this for, otherwise it's a bit hard to understand.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
+                expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this deserves some comments since otherwise it's a bit confusing why we need it.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1276,4 +1279,266 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
       }
     }
   }
+
+  test("SPARK-44647: test join key is subset of cluster key " +
+      "with push values and partially-clustered") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+    val partition = Array(identity("id"), identity("data"))
+    createTable(table1, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+        "(1, 'aa', cast('2020-01-01' as timestamp)), " +
+        "(2, 'bb', cast('2020-01-01' as timestamp)), " +
+        "(2, 'cc', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp))")
+
+    createTable(table2, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(2, 'ww', cast('2020-01-01' as timestamp))")
+
+    Seq(true, false).foreach { pushDownValues =>
+      Seq(true, false).foreach { partiallyClustered =>
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString,
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
+                partiallyClustered.toString,
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+                allowJoinKeysSubsetOfPartitionKeys.toString) {
+
+            val df = sql("SELECT t1.id AS id, t1.data AS t1data, t2.data AS t2data " +
+                s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " +
+                "ON t1.id = t2.id ORDER BY t1.id, t1data, t2data")
+
+            // Currently SPJ for case where join key not same as partition key
+            // only supported when push-part-values enabled

Review Comment:
   nit: the comment is out-dated



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) &&
+                  expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this should be guaranteed currently - it might be better to have this invariant check somewhere else like when constructing a `KeyGroupedPartitioning`, but OK to leave it here for now



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(

Review Comment:
   nit: we can just use `SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys` - it's shorter



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join keys are" +
+        "a subset of the partition keys of the source tables.  At planning time, " +
+        "Spark will group the partitions by only those keys that are in the join keys." +
+        "This is currently enabled only if spark.sql.sources.v2.bucketing.pushPartValues.enabled " +

Review Comment:
   nit: this is out-dated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org