You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2024/02/27 00:41:17 UTC

[PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

szehon-ho opened a new pull request, #45267:
URL: https://github.com/apache/spark/pull/45267

   
     ### What changes were proposed in this pull request?
   -- Allow SPJ between 'compatible' bucket funtions
   -- Add a mechanism to define 'reducible' functions, one function whose output can be 'reduced' to another for all inputs.
   
     ### Why are the changes needed?
   -- SPJ currently applies only if the partition transform expressions on both sides are identifical.
   
     ### Does this PR introduce _any_ user-facing change?
   No
   
     ### How was this patch tested?
   Added new tests in KeyGroupedPartitioningSuite
   
     ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1505905928


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1310,6 +1314,312 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     }
   }
 
+  test("SPARK-47094: Support compatible buckets") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq(
+      ((2, 4), (4, 2)),
+      ((4, 2), (2, 4)),
+      ((2, 2), (4, 6)),
+      ((6, 2), (2, 2))).foreach {
+      case ((table1buckets1, table1buckets2), (table2buckets1, table2buckets2)) =>
+        catalog.clearTables()
+
+        val partition1 = Array(bucket(table1buckets1, "store_id"),
+          bucket(table1buckets2, "dept_id"))
+        val partition2 = Array(bucket(table2buckets1, "store_id"),
+          bucket(table2buckets2, "dept_id"))
+
+        Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
+          createTable(tab, schema2, part)
+          val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+            "(0, 0, 'aa'), " +
+            "(0, 0, 'ab'), " + // duplicate partition key
+            "(0, 1, 'ac'), " +
+            "(0, 2, 'ad'), " +
+            "(0, 3, 'ae'), " +
+            "(0, 4, 'af'), " +
+            "(0, 5, 'ag'), " +
+            "(1, 0, 'ah'), " +
+            "(1, 0, 'ai'), " + // duplicate partition key
+            "(1, 1, 'aj'), " +
+            "(1, 2, 'ak'), " +
+            "(1, 3, 'al'), " +
+            "(1, 4, 'am'), " +
+            "(1, 5, 'an'), " +
+            "(2, 0, 'ao'), " +
+            "(2, 0, 'ap'), " + // duplicate partition key
+            "(2, 1, 'aq'), " +
+            "(2, 2, 'ar'), " +
+            "(2, 3, 'as'), " +
+            "(2, 4, 'at'), " +
+            "(2, 5, 'au'), " +
+            "(3, 0, 'av'), " +
+            "(3, 0, 'aw'), " + // duplicate partition key
+            "(3, 1, 'ax'), " +
+            "(3, 2, 'ay'), " +
+            "(3, 3, 'az'), " +
+            "(3, 4, 'ba'), " +
+            "(3, 5, 'bb'), " +
+            "(4, 0, 'bc'), " +
+            "(4, 0, 'bd'), " + // duplicate partition key
+            "(4, 1, 'be'), " +
+            "(4, 2, 'bf'), " +
+            "(4, 3, 'bg'), " +
+            "(4, 4, 'bh'), " +
+            "(4, 5, 'bi'), " +
+            "(5, 0, 'bj'), " +
+            "(5, 0, 'bk'), " + // duplicate partition key
+            "(5, 1, 'bl'), " +
+            "(5, 2, 'bm'), " +
+            "(5, 3, 'bn'), " +
+            "(5, 4, 'bo'), " +
+            "(5, 5, 'bp')"
+
+            // additional unmatched partitions to test push down
+            val finalStr = if (tab == table1) {
+              insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')"
+            } else {
+              insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')"
+            }
+
+            sql(finalStr)
+        }
+
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+              allowJoinKeysSubsetOfPartitionKeys.toString,
+            SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+            val df = sql(
+              s"""
+                 |${selectWithMergeJoinHint("t1", "t2")}
+                 |t1.store_id, t1.dept_id, t1.data, t2.data
+                 |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                 |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+                 |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+                 |""".stripMargin)
+
+            val shuffles = collectShuffles(df.queryExecution.executedPlan)
+            assert(shuffles.isEmpty, "SPJ should be triggered")
+
+            val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+              partitions.length)
+            val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
+              Math.min(table1buckets2, table2buckets2)
+            assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+            checkAnswer(df, Seq(
+              Row(0, 0, "aa", "aa"),
+              Row(0, 0, "aa", "ab"),
+              Row(0, 0, "ab", "aa"),
+              Row(0, 0, "ab", "ab"),
+              Row(0, 1, "ac", "ac"),
+              Row(0, 2, "ad", "ad"),
+              Row(0, 3, "ae", "ae"),
+              Row(0, 4, "af", "af"),
+              Row(0, 5, "ag", "ag"),
+              Row(1, 0, "ah", "ah"),
+              Row(1, 0, "ah", "ai"),
+              Row(1, 0, "ai", "ah"),
+              Row(1, 0, "ai", "ai"),
+              Row(1, 1, "aj", "aj"),
+              Row(1, 2, "ak", "ak"),
+              Row(1, 3, "al", "al"),
+              Row(1, 4, "am", "am"),
+              Row(1, 5, "an", "an"),
+              Row(2, 0, "ao", "ao"),
+              Row(2, 0, "ao", "ap"),
+              Row(2, 0, "ap", "ao"),
+              Row(2, 0, "ap", "ap"),
+              Row(2, 1, "aq", "aq"),
+              Row(2, 2, "ar", "ar"),
+              Row(2, 3, "as", "as"),
+              Row(2, 4, "at", "at"),
+              Row(2, 5, "au", "au"),
+              Row(3, 0, "av", "av"),
+              Row(3, 0, "av", "aw"),
+              Row(3, 0, "aw", "av"),
+              Row(3, 0, "aw", "aw"),
+              Row(3, 1, "ax", "ax"),
+              Row(3, 2, "ay", "ay"),
+              Row(3, 3, "az", "az"),
+              Row(3, 4, "ba", "ba"),
+              Row(3, 5, "bb", "bb"),
+              Row(4, 0, "bc", "bc"),
+              Row(4, 0, "bc", "bd"),
+              Row(4, 0, "bd", "bc"),
+              Row(4, 0, "bd", "bd"),
+              Row(4, 1, "be", "be"),
+              Row(4, 2, "bf", "bf"),
+              Row(4, 3, "bg", "bg"),
+              Row(4, 4, "bh", "bh"),
+              Row(4, 5, "bi", "bi"),
+              Row(5, 0, "bj", "bj"),
+              Row(5, 0, "bj", "bk"),
+              Row(5, 0, "bk", "bj"),
+              Row(5, 0, "bk", "bk"),
+              Row(5, 1, "bl", "bl"),
+              Row(5, 2, "bm", "bm"),
+              Row(5, 3, "bn", "bn"),
+              Row(5, 4, "bo", "bo"),
+              Row(5, 5, "bp", "bp")
+            ))
+          }
+        }
+    }
+  }
+
+  test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
+      case (table1buckets, table2buckets) =>
+        catalog.clearTables()
+
+        val partition1 = Array(identity("data"),
+          bucket(table1buckets, "dept_id"))
+        val partition2 = Array(bucket(3, "store_id"),
+          bucket(table2buckets, "dept_id"))
+
+        createTable(table1, schema2, partition1)
+        sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+          "(0, 0, 'aa'), " +
+          "(1, 0, 'ab'), " +
+          "(2, 1, 'ac'), " +
+          "(3, 2, 'ad'), " +
+          "(4, 3, 'ae'), " +
+          "(5, 4, 'af'), " +
+          "(6, 5, 'ag'), " +
+
+          // value without other side match
+          "(6, 6, 'xx')"
+        )
+
+        createTable(table2, schema2, partition2)
+        sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+          "(6, 0, '01'), " +
+          "(5, 1, '02'), " + // duplicate partition key
+          "(5, 1, '03'), " +
+          "(4, 2, '04'), " +
+          "(3, 3, '05'), " +
+          "(2, 4, '06'), " +
+          "(1, 5, '07'), " +
+
+          // value without other side match
+          "(7, 7, '99')"
+        )
+
+
+        withSQLConf(
+          SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+          SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+          SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+          SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
+          SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+          val df = sql(
+            s"""
+               |${selectWithMergeJoinHint("t1", "t2")}
+               |t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
+               |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+               |ON t1.dept_id = t2.dept_id
+               |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+               |""".stripMargin)
+
+          val shuffles = collectShuffles(df.queryExecution.executedPlan)
+          assert(shuffles.isEmpty, "SPJ should be triggered")
+
+          val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+            partitions.length)
+
+          val expectedBuckets = Math.min(table1buckets, table2buckets)

Review Comment:
   Looks like the bucket numbers are always coalesced to a smaller in the current impl?
   
   It might not be the desired behavior when the coalesced bucket is extreme small, like 1 or 2 or 4...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1521439073


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   > For instance, geo bucketing functions as well. 
   
   emm, of course, geo bucketing makes a lot of sense.
   
   > I think could be `hours(col)` vs `days(col)`
   
   I'm not sure about this use case. Theoretically, we can "reduce" the former into the latter, however it seems impractical to me to "reduce" hours into days. Suppose we have table A with `hours(col)` partition transform, and table B with `days(col)` partition transform and we are going to do join with `A.col = B.col`. If A's hours partitions are reduced to days partition, it means we need to process ~24 times partition data in one task, which might already been big enough from table A's perspective?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1529561359


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   OK, made two methods and a default for both, so that implementations can pick one to override



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2035996293

   @sunchao thanks can you take another look?  I think the failed test is not related (looks like error in pyspark uploading test report).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1530840650


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need resolution.
   
   as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/ 
   
   So i added back the type to Object in latest pr, hope it is ok. 
   
   Note, I tried awhile to make the method type-parameterized like
   
   ```
   <T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                     T otherNumBuckets);
   ```
   
    but was not able to override it successfully in Bucket in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1529047571


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined function f_target(x),
+ * if there exists a 'reducer' r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ * @param <I> reducer input type

Review Comment:
   nit: leave a blank line before the first `@param`



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined function f_target(x),

Review Comment:
   nit: `user_defined` -> `user defined`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -846,6 +881,20 @@ case class KeyGroupedShuffleSpec(
   }
 }
 
+object KeyGroupedShuffleSpec {
+  def reducePartitionValue(row: InternalRow,
+                           expressions: Seq[Expression],

Review Comment:
   nit: indentation



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;

Review Comment:
   It's not a good idea to have `scala.Option` in a Java interface.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala:
##########
@@ -85,6 +85,23 @@ object BucketFunction extends ScalarFunction[Int] {
   override def produceResult(input: InternalRow): Int = {
     (input.getLong(1) % input.getInt(0)).toInt
   }
+
+  override def reducer(func: ReducibleFunction[_, _],
+                       thisNumBuckets: Option[_],

Review Comment:
   nit: indentation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -505,11 +506,28 @@ case class EnsureRequirements(
           }
         }
 
-        // Now we need to push-down the common partition key to the scan in each child
-        newLeft = populatePartitionValues(left, mergedPartValues, leftSpec.joinKeyPositions,
-          applyPartialClustering, replicateLeftSide)
-        newRight = populatePartitionValues(right, mergedPartValues, rightSpec.joinKeyPositions,
-          applyPartialClustering, replicateRightSide)
+        // in case of compatible but not identical partition expressions, we apply 'reduce'
+        // transforms to group one side's partitions as well as the common partition values
+        val leftReducers = leftSpec.reducers(rightSpec)
+        val rightReducers = rightSpec.reducers(leftSpec)
+
+        if (leftReducers.isDefined || rightReducers.isDefined) {

Review Comment:
   looks like we should be able to support the case where the number of buckets are not divisible but have a greatest common divisor? 
   such as `bucket(16, x)` vs `bucket(12, y)`? in this case, since the common divisor is 4, we can have reducers for both `x` and `y`, dividing inputs by 4 and 3 respectively.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -527,25 +545,38 @@ case class EnsureRequirements(
         joinType == LeftAnti || joinType == LeftOuter
   }
 
-  // Populate the common partition values down to the scan nodes
-  private def populatePartitionValues(
+  // Populate the common partition information down to the scan nodes
+  private def populateCommonPartitionInfo(
       plan: SparkPlan,
       values: Seq[(InternalRow, Int)],
       joinKeyPositions: Option[Seq[Int]],
+      reducers: Option[Seq[Option[Reducer[_, _]]]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean): SparkPlan = plan match {
     case scan: BatchScanExec =>
       scan.copy(
         spjParams = scan.spjParams.copy(
           commonPartitionValues = Some(values),
           joinKeyPositions = joinKeyPositions,
+          reducers = reducers,
           applyPartialClustering = applyPartialClustering,
           replicatePartitions = replicatePartitions
         )
       )
     case node =>
-      node.mapChildren(child => populatePartitionValues(
-        child, values, joinKeyPositions, applyPartialClustering, replicatePartitions))
+      node.mapChildren(child => populateCommonPartitionInfo(
+        child, values, joinKeyPositions, reducers, applyPartialClustering, replicatePartitions))
+  }
+
+  private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
+                                     expressions: Seq[Expression],

Review Comment:
   nit: indentation



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   It is unclear to me what should this be beyond the bucketing case. Should we add a separate method just for the special case of bucketing?
   
   ```java
       Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other);
   
       Option<Reducer<I, O>> bucketReducer(ReducibleFunction<I, O> other, int numBuckets,
                                     int otherNumBuckets);
   ```



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.

Review Comment:
   Maybe point out where it is used, i.e., `ReducibleFunction`. We can also add a `@see` pointing to it.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined function f_target(x),
+ * if there exists a 'reducer' r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ * @param <I> reducer input type
+ * @param <O> reducer output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface Reducer<I, O> {
+    O reduce(I arg1);

Review Comment:
   nit: `arg1` -> `arg`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -833,10 +834,44 @@ case class KeyGroupedShuffleSpec(
     (left, right) match {
       case (_: LeafExpression, _: LeafExpression) => true
       case (left: TransformExpression, right: TransformExpression) =>
-        left.isSameFunction(right)
+        if (SQLConf.get.v2BucketingPushPartValuesEnabled &&
+          !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
+          SQLConf.get.v2BucketingAllowCompatibleTransforms) {
+          left.isCompatible(right)
+        } else {
+          left.isSameFunction(right)
+        }
       case _ => false
     }
 
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {

Review Comment:
   I think the input can only be `KeyGroupedShuffleSpec` so we can just use it instead of `ShuffleSpec`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -833,10 +834,44 @@ case class KeyGroupedShuffleSpec(
     (left, right) match {
       case (_: LeafExpression, _: LeafExpression) => true
       case (left: TransformExpression, right: TransformExpression) =>
-        left.isSameFunction(right)
+        if (SQLConf.get.v2BucketingPushPartValuesEnabled &&
+          !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
+          SQLConf.get.v2BucketingAllowCompatibleTransforms) {
+          left.isCompatible(right)
+        } else {
+          left.isSameFunction(right)
+        }
       case _ => false
     }
 
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.

Review Comment:
   nit: `Option[[Reducer]]` is not rendered properly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1529561359


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   OK, made default impl for both so that implementations can pick one to override



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1538195196


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function

Review Comment:
   Yea I think you are right, it requires more changes to Spark, V2ExpressionUtil, etc to add these.  Let me revert back to bucketReducer() and then later if we figure that out, we can make this just call the general case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1536300235


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function

Review Comment:
   I was thinking a list, would that work or cause any issue ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1520267117


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   One use case I think could be `hours(col)` vs `days(col)`, in this case we can "reduce" the former into the latter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1503537598


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1537,6 +1537,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS =
+    buildConf("spark.sql.sources.v2.bucketing.allow.enabled")

Review Comment:
   Thanks, yea it is a copy and paste error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1523706702


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * @param other other function
+     * @param thisArgument argument for this function instance

Review Comment:
   Added javadocs



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   done



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {

Review Comment:
   added javadocs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1521729517


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   This is just one potential example. It could be useful, for instance, if the table with `hours(col)` has much less data than the other side (but not small enough to trigger BHJ). The `Reducer` here allows data sources to specify relationships between transforms beyond the bucketing case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1536551512


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function

Review Comment:
   I'm thinking whether we can use `InternalRow` for this, which is consistent with the v2 function API (see `ScalarFunction` and `AggregateFunction`, and can support multiple parameters.
   
   We need to define a clear contract between Spark and the callers of this method though: **what should a caller expect to see from `thisParam` and `otherParam`**. 
   
   For `bucket` it is quite clear, but I'm not sure how this will this work with geohash function that you pointed out. In Spark, we need a way for the data source provider to pass the `resolution` of the function to Spark via v2 catalog, and then pass it back to the data source via `reducer`. Any idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1530840650


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need 'resolution' parameter.
   
   as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/ 
   
   So i added back the type to Object in latest pr, hope it is ok. 
   
   Note, I tried awhile to make the method type-parameterized like
   
   ```
   <T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                     T otherNumBuckets);
   ```
   
    but was not able to override it successfully in Bucket in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1530840650


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need 'resolution' parameter.
   
   as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/ 
   
   So i added back the type to Object in latest pr.
   
   This is one approach, we can probably re-use int in some of the other ones, but it seemed cleaner to have a generic type.  Let me know what you think.
   
   Note, I tried awhile to make the method type-parameterized like
   
   ```
   <T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                     T otherNumBuckets);
   ```
   
    but was not able to override it successfully in Bucket in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1535145417


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -846,6 +879,20 @@ case class KeyGroupedShuffleSpec(
   }
 }
 
+object KeyGroupedShuffleSpec {
+  def reducePartitionValue(row: InternalRow,

Review Comment:
   nit: we usually follow this style:
   
   ```scala
     def reducePartitionValue(
         row: InternalRow,
         expressions: Seq[Expression],
         reducers: Seq[Option[Reducer[_, _]]]):
       InternalRowComparableWrapper = {
       val partitionVals = row.toSeq(expressions.map(_.dataType))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1541,6 +1541,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS =
+    buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled")
+      .doc("Whether to allow storage-partition join in the case where the partition transforms" +

Review Comment:
   nit: space after `transforms`



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function
+   * @param otherFunction the other parameterized function
+   * @param otherParam parameter for the other function
+   * @return a reduction function if it is reducible, null if not
+   */
+  default Reducer<I, O> reducer(
+    Object thisParam,
+    ReducibleFunction<?, ?> otherFunction,

Review Comment:
   `ReducibleFunction<I, O> otherFunction` did not work? just wonder why



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function

Review Comment:
   curious what will `thisParam` and `otherParam` be if there are multiple parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1538195196


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function

Review Comment:
   Yea I think you are right, it requires more changes to Spark, V2ExpressionUtil, etc to add these.  Let me revert back to bucketReducer() and then later if we figure that out, we can make this method invoke the general API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1521800234


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1310,6 +1314,312 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     }
   }
 
+  test("SPARK-47094: Support compatible buckets") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq(
+      ((2, 4), (4, 2)),
+      ((4, 2), (2, 4)),
+      ((2, 2), (4, 6)),
+      ((6, 2), (2, 2))).foreach {
+      case ((table1buckets1, table1buckets2), (table2buckets1, table2buckets2)) =>
+        catalog.clearTables()
+
+        val partition1 = Array(bucket(table1buckets1, "store_id"),
+          bucket(table1buckets2, "dept_id"))
+        val partition2 = Array(bucket(table2buckets1, "store_id"),
+          bucket(table2buckets2, "dept_id"))
+
+        Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
+          createTable(tab, schema2, part)
+          val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+            "(0, 0, 'aa'), " +
+            "(0, 0, 'ab'), " + // duplicate partition key
+            "(0, 1, 'ac'), " +
+            "(0, 2, 'ad'), " +
+            "(0, 3, 'ae'), " +
+            "(0, 4, 'af'), " +
+            "(0, 5, 'ag'), " +
+            "(1, 0, 'ah'), " +
+            "(1, 0, 'ai'), " + // duplicate partition key
+            "(1, 1, 'aj'), " +
+            "(1, 2, 'ak'), " +
+            "(1, 3, 'al'), " +
+            "(1, 4, 'am'), " +
+            "(1, 5, 'an'), " +
+            "(2, 0, 'ao'), " +
+            "(2, 0, 'ap'), " + // duplicate partition key
+            "(2, 1, 'aq'), " +
+            "(2, 2, 'ar'), " +
+            "(2, 3, 'as'), " +
+            "(2, 4, 'at'), " +
+            "(2, 5, 'au'), " +
+            "(3, 0, 'av'), " +
+            "(3, 0, 'aw'), " + // duplicate partition key
+            "(3, 1, 'ax'), " +
+            "(3, 2, 'ay'), " +
+            "(3, 3, 'az'), " +
+            "(3, 4, 'ba'), " +
+            "(3, 5, 'bb'), " +
+            "(4, 0, 'bc'), " +
+            "(4, 0, 'bd'), " + // duplicate partition key
+            "(4, 1, 'be'), " +
+            "(4, 2, 'bf'), " +
+            "(4, 3, 'bg'), " +
+            "(4, 4, 'bh'), " +
+            "(4, 5, 'bi'), " +
+            "(5, 0, 'bj'), " +
+            "(5, 0, 'bk'), " + // duplicate partition key
+            "(5, 1, 'bl'), " +
+            "(5, 2, 'bm'), " +
+            "(5, 3, 'bn'), " +
+            "(5, 4, 'bo'), " +
+            "(5, 5, 'bp')"
+
+            // additional unmatched partitions to test push down
+            val finalStr = if (tab == table1) {
+              insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')"
+            } else {
+              insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')"
+            }
+
+            sql(finalStr)
+        }
+
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+              allowJoinKeysSubsetOfPartitionKeys.toString,
+            SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+            val df = sql(
+              s"""
+                 |${selectWithMergeJoinHint("t1", "t2")}
+                 |t1.store_id, t1.dept_id, t1.data, t2.data
+                 |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                 |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+                 |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+                 |""".stripMargin)
+
+            val shuffles = collectShuffles(df.queryExecution.executedPlan)
+            assert(shuffles.isEmpty, "SPJ should be triggered")
+
+            val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+              partitions.length)
+            val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
+              Math.min(table1buckets2, table2buckets2)
+            assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+            checkAnswer(df, Seq(
+              Row(0, 0, "aa", "aa"),
+              Row(0, 0, "aa", "ab"),
+              Row(0, 0, "ab", "aa"),
+              Row(0, 0, "ab", "ab"),
+              Row(0, 1, "ac", "ac"),
+              Row(0, 2, "ad", "ad"),
+              Row(0, 3, "ae", "ae"),
+              Row(0, 4, "af", "af"),
+              Row(0, 5, "ag", "ag"),
+              Row(1, 0, "ah", "ah"),
+              Row(1, 0, "ah", "ai"),
+              Row(1, 0, "ai", "ah"),
+              Row(1, 0, "ai", "ai"),
+              Row(1, 1, "aj", "aj"),
+              Row(1, 2, "ak", "ak"),
+              Row(1, 3, "al", "al"),
+              Row(1, 4, "am", "am"),
+              Row(1, 5, "an", "an"),
+              Row(2, 0, "ao", "ao"),
+              Row(2, 0, "ao", "ap"),
+              Row(2, 0, "ap", "ao"),
+              Row(2, 0, "ap", "ap"),
+              Row(2, 1, "aq", "aq"),
+              Row(2, 2, "ar", "ar"),
+              Row(2, 3, "as", "as"),
+              Row(2, 4, "at", "at"),
+              Row(2, 5, "au", "au"),
+              Row(3, 0, "av", "av"),
+              Row(3, 0, "av", "aw"),
+              Row(3, 0, "aw", "av"),
+              Row(3, 0, "aw", "aw"),
+              Row(3, 1, "ax", "ax"),
+              Row(3, 2, "ay", "ay"),
+              Row(3, 3, "az", "az"),
+              Row(3, 4, "ba", "ba"),
+              Row(3, 5, "bb", "bb"),
+              Row(4, 0, "bc", "bc"),
+              Row(4, 0, "bc", "bd"),
+              Row(4, 0, "bd", "bc"),
+              Row(4, 0, "bd", "bd"),
+              Row(4, 1, "be", "be"),
+              Row(4, 2, "bf", "bf"),
+              Row(4, 3, "bg", "bg"),
+              Row(4, 4, "bh", "bh"),
+              Row(4, 5, "bi", "bi"),
+              Row(5, 0, "bj", "bj"),
+              Row(5, 0, "bj", "bk"),
+              Row(5, 0, "bk", "bj"),
+              Row(5, 0, "bk", "bk"),
+              Row(5, 1, "bl", "bl"),
+              Row(5, 2, "bm", "bm"),
+              Row(5, 3, "bn", "bn"),
+              Row(5, 4, "bo", "bo"),
+              Row(5, 5, "bp", "bp")
+            ))
+          }
+        }
+    }
+  }
+
+  test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
+      case (table1buckets, table2buckets) =>
+        catalog.clearTables()
+
+        val partition1 = Array(identity("data"),
+          bucket(table1buckets, "dept_id"))
+        val partition2 = Array(bucket(3, "store_id"),
+          bucket(table2buckets, "dept_id"))
+
+        createTable(table1, schema2, partition1)
+        sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+          "(0, 0, 'aa'), " +
+          "(1, 0, 'ab'), " +
+          "(2, 1, 'ac'), " +
+          "(3, 2, 'ad'), " +
+          "(4, 3, 'ae'), " +
+          "(5, 4, 'af'), " +
+          "(6, 5, 'ag'), " +
+
+          // value without other side match
+          "(6, 6, 'xx')"
+        )
+
+        createTable(table2, schema2, partition2)
+        sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+          "(6, 0, '01'), " +
+          "(5, 1, '02'), " + // duplicate partition key
+          "(5, 1, '03'), " +
+          "(4, 2, '04'), " +
+          "(3, 3, '05'), " +
+          "(2, 4, '06'), " +
+          "(1, 5, '07'), " +
+
+          // value without other side match
+          "(7, 7, '99')"
+        )
+
+
+        withSQLConf(
+          SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+          SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+          SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+          SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
+          SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+          val df = sql(
+            s"""
+               |${selectWithMergeJoinHint("t1", "t2")}
+               |t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
+               |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+               |ON t1.dept_id = t2.dept_id
+               |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+               |""".stripMargin)
+
+          val shuffles = collectShuffles(df.queryExecution.executedPlan)
+          assert(shuffles.isEmpty, "SPJ should be triggered")
+
+          val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+            partitions.length)
+
+          val expectedBuckets = Math.min(table1buckets, table2buckets)

Review Comment:
   Yes, I think it will be very useful to make partially clustered distribution work with the feature introduced in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1523707057


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
+    // Only support partition expressions are AttributeReference for now
+    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
+  }
+
+  override def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = {
+    other match {
+      case otherSpec: KeyGroupedShuffleSpec =>
+        val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
+          case (e1: TransformExpression, e2: TransformExpression)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1523710658


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&

Review Comment:
   done, i think i was trying to move the private method to the bottom



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1538630610


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisBucketFunction = bucket</li>
+   *     <li>thisNumBuckets = 4</li>
+   *     <li>otherBucketFunction = bucket</li>
+   *     <li>otherNumBuckets = 2</li>
+   * </ul>
+   *
+   * @param thisNumBuckets parameter for this function
+   * @param otherBucketFunction the other parameterized function
+   * @param otherNumBuckets parameter for the other function
+   * @return a reduction function if it is reducible, null if not
+   */
+  default Reducer<I, O> bucketReducer(
+    int thisNumBuckets,
+    ReducibleFunction<?, ?> otherBucketFunction,
+    int otherNumBuckets) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * This method is for all other functions.
+   *
+   * If this function is 'reducible' on another function, return the {@link Reducer} function.
+   * <p>
+   * Example of reducing f_source = days(x) on f_target = hours(x)
+   * <ul>
+   *     <li>thisFunction = days</li>
+   *     <li>otherFunction = hours</li>
+   * </ul>
+   *
+   * @param otherFunction the other function
+   * @return a reduction function if it is reducible, null if not.
+   */
+  default Reducer<I, O> bucketReducer(ReducibleFunction<?, ?> otherFunction) {

Review Comment:
   should this just be `reducer`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -527,25 +545,38 @@ case class EnsureRequirements(
         joinType == LeftAnti || joinType == LeftOuter
   }
 
-  // Populate the common partition values down to the scan nodes
-  private def populatePartitionValues(
+  // Populate the common partition information down to the scan nodes
+  private def populateCommonPartitionInfo(
       plan: SparkPlan,
       values: Seq[(InternalRow, Int)],
       joinKeyPositions: Option[Seq[Int]],
+      reducers: Option[Seq[Option[Reducer[_, _]]]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean): SparkPlan = plan match {
     case scan: BatchScanExec =>
       scan.copy(
         spjParams = scan.spjParams.copy(
           commonPartitionValues = Some(values),
           joinKeyPositions = joinKeyPositions,
+          reducers = reducers,
           applyPartialClustering = applyPartialClustering,
           replicatePartitions = replicatePartitions
         )
       )
     case node =>
-      node.mapChildren(child => populatePartitionValues(
-        child, values, joinKeyPositions, applyPartialClustering, replicatePartitions))
+      node.mapChildren(child => populateCommonPartitionInfo(
+        child, values, joinKeyPositions, reducers, applyPartialClustering, replicatePartitions))
+  }
+
+  private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],

Review Comment:
   ditto, style



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisBucketFunction = bucket</li>
+   *     <li>thisNumBuckets = 4</li>
+   *     <li>otherBucketFunction = bucket</li>
+   *     <li>otherNumBuckets = 2</li>
+   * </ul>
+   *
+   * @param thisNumBuckets parameter for this function
+   * @param otherBucketFunction the other parameterized function
+   * @param otherNumBuckets parameter for the other function
+   * @return a reduction function if it is reducible, null if not
+   */
+  default Reducer<I, O> bucketReducer(
+    int thisNumBuckets,

Review Comment:
   I think we may need 4 space indentation here too



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -846,6 +879,21 @@ case class KeyGroupedShuffleSpec(
   }
 }
 
+object KeyGroupedShuffleSpec {
+  def reducePartitionValue(
+    row: InternalRow,

Review Comment:
   nit: indentation is off: should be:
   ```scala
     def reducePartitionValue(
         row: InternalRow,
         expressions: Seq[Expression],
         reducers: Seq[Option[Reducer[_, _]]]):
       InternalRowComparableWrapper = {
       val partitionVals = row.toSeq(expressions.map(_.dataType))
       val reducedRow = partitionVals.zip(reducers).map{
         case (v, Some(reducer: Reducer[Any, Any])) => reducer.reduce(v)
         case (v, _) => v
   ```
   
   4 space indentation for method parameters, see https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala:
##########
@@ -85,6 +85,34 @@ object BucketFunction extends ScalarFunction[Int] {
   override def produceResult(input: InternalRow): Int = {
     (input.getLong(1) % input.getInt(0)).toInt
   }
+
+  override def bucketReducer(

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2040912874

   Thanks, merged to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao closed pull request #45267: [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal
URL: https://github.com/apache/spark/pull/45267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1505984397


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   Seems a bit over engineering to introduce this, `Reducer`  and `ReducibleFunction` to check bucket numbers are compatible and reduce bucket numbers.
   
   Do you have any user case in mind that partition transforms other than bucket transform could leverage this?
   If not, these classes might be used internally rather than exposed as a public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1521751659


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   > The Reducer here allows data sources to specify relationships between transforms beyond the bucketing case.
   
   Yeah, I can get the potential usage now. However, It's still hard for developers to correctly understands what `Reducer` actually mean (it's mathematically clear though) and how does it work. Maybe we should add some concrete examples in the JavaDoc of `Reducer` class. It would also be great to demonstrate the use case beyonds the bucketing case in the test, but I think that's optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1536299907


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * <ul>
+ *   <li> There exists a reducer function r(x) such that r(f_source(x)) = f_target(x)
+ *        for all input x, or </li>
+ *   <li> More generally, there exists reducer functions r1(x) and r2(x) such that
+ *        r1(f_source(x)) = r2(f_target(x)) for all input x. </li>
+ * </ul>
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions where one side has reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x % 2</li>
+ *    </ul>
+ *
+ *    <li>Bucket functions where both sides have reducer
+ *    <ul>
+ *        <li>f_source(x) = bucket(16, x)</li>
+ *        <li>f_target(x) = bucket(12, x)</li>
+ *        <li>r1(x) = x % 4</li>
+ *        <li>r2(x) = x % 4</li>
+ *    </ul>
+ *
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+  /**
+   * This method is for parameterized functions.
+   *
+   * If this parameterized function is 'reducible' on another bucket function,
+   * return the {@link Reducer} function.
+   * <p>
+   * Example to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x)
+   * <ul>
+   *     <li>thisFunction = bucket</li>
+   *     <li>thisParam = Int(4)</li>
+   *     <li>otherFunction = bucket</li>
+   *     <li>otherParam = Int(2)</li>
+   * </ul>
+   *
+   * @param thisParam parameter for this function
+   * @param otherFunction the other parameterized function
+   * @param otherParam parameter for the other function
+   * @return a reduction function if it is reducible, null if not
+   */
+  default Reducer<I, O> reducer(
+    Object thisParam,
+    ReducibleFunction<?, ?> otherFunction,

Review Comment:
   Yea I think I get an error like:
   
   ```
   /Users/szehon/repos/apache-spark/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:105:42: type mismatch;
   [error]  found   : org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$5,_$6] where type _$6, type _$5
   [error]  required: org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$3,_$4]
   [error]         thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
   [error]                                          ^
   [error] /Users/szehon/repos/apache-spark/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:106:38: type mismatch;
   [error]  found   : org.apache.spark.sql.connector.catalog.functions.ReducibleFunction[_$5,_$6] where type _$6, type _$5
   ```
   
   But actually there is no requirement they have to be the exact same Input, Output type, in theory the other function could have different types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2016084578

   @sunchao thanks ! addressed review comments.  Lmk if supporting generic function parameter is too messy, and we want to switch back to just bucket for first cut.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1520187304


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * @param other other function
+     * @param thisArgument argument for this function instance

Review Comment:
   the arguments here are a bit confusing - as a caller of this function, how do I know what should I pass here?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {

Review Comment:
   what is `T` and what is `A`? should `Reducer` also have two type parameters?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   maybe this should belong to `KeyGroupedShuffleSpec`? I'm not sure whether this API is meaningful for other shuffle specs.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&

Review Comment:
   nit: seems unnecessary change



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
+    // Only support partition expressions are AttributeReference for now
+    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
+  }
+
+  override def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = {
+    other match {
+      case otherSpec: KeyGroupedShuffleSpec =>
+        val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
+          case (e1: TransformExpression, e2: TransformExpression)

Review Comment:
   maybe it's better to move this logic into `TransformExpression` itself, e.g., add a `TransformExpression.reducer` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2008733057

   @sunchao can you take another look?  I guess open question is how to support other potential parameterized transforms in the future, let me know your thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-1965888375

   Thanks @szehon-ho ! will take a look in a few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2019159327

   @sunchao if you have time for another look, reverted to use specific argument type and method for bucket, and worry about other parameterized transforms later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "himadripal (via GitHub)" <gi...@apache.org>.
himadripal commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1503514183


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1537,6 +1537,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS =
+    buildConf("spark.sql.sources.v2.bucketing.allow.enabled")

Review Comment:
   may be rename `spark.sql.sources.v2.bucketing.allow.enabled` to `spark.sql.sources.v2.bucketing.allowCompatibleTransform.enabled`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "himadripal (via GitHub)" <gi...@apache.org>.
himadripal commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-1965681861

   was trying this test case, 
   `val partition1 = Array(Expressions.years("ts"),
     bucket(2, "id"))
   val partition2 = Array(Expressions.days("ts"),
     bucket(4, "id"))`
   
   this throws an exception when paritiallyClustered = false and allowCompatibleTransform=true. 
   
   exception :
   `class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap')
   java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap')
   	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:103)
   	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1529562057


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -505,11 +506,28 @@ case class EnsureRequirements(
           }
         }
 
-        // Now we need to push-down the common partition key to the scan in each child
-        newLeft = populatePartitionValues(left, mergedPartValues, leftSpec.joinKeyPositions,
-          applyPartialClustering, replicateLeftSide)
-        newRight = populatePartitionValues(right, mergedPartValues, rightSpec.joinKeyPositions,
-          applyPartialClustering, replicateRightSide)
+        // in case of compatible but not identical partition expressions, we apply 'reduce'
+        // transforms to group one side's partitions as well as the common partition values
+        val leftReducers = leftSpec.reducers(rightSpec)
+        val rightReducers = rightSpec.reducers(leftSpec)
+
+        if (leftReducers.isDefined || rightReducers.isDefined) {

Review Comment:
   Yea, or in other words both sides mod the gcd.  Ie bucket(16, x) % 4 and bucket(12, y) % 4.
   
   Implemented and added test case in latest patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-1967323627

   Thanks @himadripal I was able to modify one of my tests to also reproduce it, and it should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1530840650


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need 'resolution' parameter.
   
   as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/ 
   
   So i added back the type to Object in latest pr.
   
   This is one approach, we can probably re-use int in some of these, but it seemed cleaner to have a generic type.  Let me know what you think.
   
   Note, I tried awhile to make the method type-parameterized like
   
   ```
   <T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                     T otherNumBuckets);
   ```
   
    but was not able to override it successfully in Bucket in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1506338476


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1310,6 +1314,312 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     }
   }
 
+  test("SPARK-47094: Support compatible buckets") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq(
+      ((2, 4), (4, 2)),
+      ((4, 2), (2, 4)),
+      ((2, 2), (4, 6)),
+      ((6, 2), (2, 2))).foreach {
+      case ((table1buckets1, table1buckets2), (table2buckets1, table2buckets2)) =>
+        catalog.clearTables()
+
+        val partition1 = Array(bucket(table1buckets1, "store_id"),
+          bucket(table1buckets2, "dept_id"))
+        val partition2 = Array(bucket(table2buckets1, "store_id"),
+          bucket(table2buckets2, "dept_id"))
+
+        Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
+          createTable(tab, schema2, part)
+          val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+            "(0, 0, 'aa'), " +
+            "(0, 0, 'ab'), " + // duplicate partition key
+            "(0, 1, 'ac'), " +
+            "(0, 2, 'ad'), " +
+            "(0, 3, 'ae'), " +
+            "(0, 4, 'af'), " +
+            "(0, 5, 'ag'), " +
+            "(1, 0, 'ah'), " +
+            "(1, 0, 'ai'), " + // duplicate partition key
+            "(1, 1, 'aj'), " +
+            "(1, 2, 'ak'), " +
+            "(1, 3, 'al'), " +
+            "(1, 4, 'am'), " +
+            "(1, 5, 'an'), " +
+            "(2, 0, 'ao'), " +
+            "(2, 0, 'ap'), " + // duplicate partition key
+            "(2, 1, 'aq'), " +
+            "(2, 2, 'ar'), " +
+            "(2, 3, 'as'), " +
+            "(2, 4, 'at'), " +
+            "(2, 5, 'au'), " +
+            "(3, 0, 'av'), " +
+            "(3, 0, 'aw'), " + // duplicate partition key
+            "(3, 1, 'ax'), " +
+            "(3, 2, 'ay'), " +
+            "(3, 3, 'az'), " +
+            "(3, 4, 'ba'), " +
+            "(3, 5, 'bb'), " +
+            "(4, 0, 'bc'), " +
+            "(4, 0, 'bd'), " + // duplicate partition key
+            "(4, 1, 'be'), " +
+            "(4, 2, 'bf'), " +
+            "(4, 3, 'bg'), " +
+            "(4, 4, 'bh'), " +
+            "(4, 5, 'bi'), " +
+            "(5, 0, 'bj'), " +
+            "(5, 0, 'bk'), " + // duplicate partition key
+            "(5, 1, 'bl'), " +
+            "(5, 2, 'bm'), " +
+            "(5, 3, 'bn'), " +
+            "(5, 4, 'bo'), " +
+            "(5, 5, 'bp')"
+
+            // additional unmatched partitions to test push down
+            val finalStr = if (tab == table1) {
+              insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')"
+            } else {
+              insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')"
+            }
+
+            sql(finalStr)
+        }
+
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+              allowJoinKeysSubsetOfPartitionKeys.toString,
+            SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+            val df = sql(
+              s"""
+                 |${selectWithMergeJoinHint("t1", "t2")}
+                 |t1.store_id, t1.dept_id, t1.data, t2.data
+                 |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                 |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+                 |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+                 |""".stripMargin)
+
+            val shuffles = collectShuffles(df.queryExecution.executedPlan)
+            assert(shuffles.isEmpty, "SPJ should be triggered")
+
+            val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+              partitions.length)
+            val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
+              Math.min(table1buckets2, table2buckets2)
+            assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+            checkAnswer(df, Seq(
+              Row(0, 0, "aa", "aa"),
+              Row(0, 0, "aa", "ab"),
+              Row(0, 0, "ab", "aa"),
+              Row(0, 0, "ab", "ab"),
+              Row(0, 1, "ac", "ac"),
+              Row(0, 2, "ad", "ad"),
+              Row(0, 3, "ae", "ae"),
+              Row(0, 4, "af", "af"),
+              Row(0, 5, "ag", "ag"),
+              Row(1, 0, "ah", "ah"),
+              Row(1, 0, "ah", "ai"),
+              Row(1, 0, "ai", "ah"),
+              Row(1, 0, "ai", "ai"),
+              Row(1, 1, "aj", "aj"),
+              Row(1, 2, "ak", "ak"),
+              Row(1, 3, "al", "al"),
+              Row(1, 4, "am", "am"),
+              Row(1, 5, "an", "an"),
+              Row(2, 0, "ao", "ao"),
+              Row(2, 0, "ao", "ap"),
+              Row(2, 0, "ap", "ao"),
+              Row(2, 0, "ap", "ap"),
+              Row(2, 1, "aq", "aq"),
+              Row(2, 2, "ar", "ar"),
+              Row(2, 3, "as", "as"),
+              Row(2, 4, "at", "at"),
+              Row(2, 5, "au", "au"),
+              Row(3, 0, "av", "av"),
+              Row(3, 0, "av", "aw"),
+              Row(3, 0, "aw", "av"),
+              Row(3, 0, "aw", "aw"),
+              Row(3, 1, "ax", "ax"),
+              Row(3, 2, "ay", "ay"),
+              Row(3, 3, "az", "az"),
+              Row(3, 4, "ba", "ba"),
+              Row(3, 5, "bb", "bb"),
+              Row(4, 0, "bc", "bc"),
+              Row(4, 0, "bc", "bd"),
+              Row(4, 0, "bd", "bc"),
+              Row(4, 0, "bd", "bd"),
+              Row(4, 1, "be", "be"),
+              Row(4, 2, "bf", "bf"),
+              Row(4, 3, "bg", "bg"),
+              Row(4, 4, "bh", "bh"),
+              Row(4, 5, "bi", "bi"),
+              Row(5, 0, "bj", "bj"),
+              Row(5, 0, "bj", "bk"),
+              Row(5, 0, "bk", "bj"),
+              Row(5, 0, "bk", "bk"),
+              Row(5, 1, "bl", "bl"),
+              Row(5, 2, "bm", "bm"),
+              Row(5, 3, "bn", "bn"),
+              Row(5, 4, "bo", "bo"),
+              Row(5, 5, "bp", "bp")
+            ))
+          }
+        }
+    }
+  }
+
+  test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
+      case (table1buckets, table2buckets) =>
+        catalog.clearTables()
+
+        val partition1 = Array(identity("data"),
+          bucket(table1buckets, "dept_id"))
+        val partition2 = Array(bucket(3, "store_id"),
+          bucket(table2buckets, "dept_id"))
+
+        createTable(table1, schema2, partition1)
+        sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+          "(0, 0, 'aa'), " +
+          "(1, 0, 'ab'), " +
+          "(2, 1, 'ac'), " +
+          "(3, 2, 'ad'), " +
+          "(4, 3, 'ae'), " +
+          "(5, 4, 'af'), " +
+          "(6, 5, 'ag'), " +
+
+          // value without other side match
+          "(6, 6, 'xx')"
+        )
+
+        createTable(table2, schema2, partition2)
+        sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+          "(6, 0, '01'), " +
+          "(5, 1, '02'), " + // duplicate partition key
+          "(5, 1, '03'), " +
+          "(4, 2, '04'), " +
+          "(3, 3, '05'), " +
+          "(2, 4, '06'), " +
+          "(1, 5, '07'), " +
+
+          // value without other side match
+          "(7, 7, '99')"
+        )
+
+
+        withSQLConf(
+          SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+          SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+          SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+          SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
+          SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+          val df = sql(
+            s"""
+               |${selectWithMergeJoinHint("t1", "t2")}
+               |t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
+               |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+               |ON t1.dept_id = t2.dept_id
+               |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+               |""".stripMargin)
+
+          val shuffles = collectShuffles(df.queryExecution.executedPlan)
+          assert(shuffles.isEmpty, "SPJ should be triggered")
+
+          val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+            partitions.length)
+
+          val expectedBuckets = Math.min(table1buckets, table2buckets)

Review Comment:
   Yes, in fact it is definitely a follow up to do this with 'partiallyClustered' (which is currently not enabled).  This mode today 'duplicate' partitions for the partially clustered side, and I think can be used to turn on the same for compatible transforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1506339765


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   Hi @advancedxy yea, it is definitely increases the scope to make a generic mechanism.  But there are actually many, the main example is that even Iceberg bucket function is not the same as Spark's and would need to implement this somehow, but obviously anything the user registers in the function catalog.  For instance, geo bucketing functions as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #45267:
URL: https://github.com/apache/spark/pull/45267#issuecomment-2043308510

   Thanks @sunchao and all for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org