You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sunchao (via GitHub)" <gi...@apache.org> on 2024/03/11 18:37:08 UTC

Re: [PR] [SPARK-47094][SQL] SPJ : Dynamically rebalance number of buckets when they are not equal [spark]

sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1520187304


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link Reducer} function.
+     * @param other other function
+     * @param thisArgument argument for this function instance

Review Comment:
   the arguments here are a bit confusing - as a caller of this function, how do I know what should I pass here?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<T, A> extends ScalarFunction<T> {

Review Comment:
   what is `T` and what is `A`? should `Reducer` also have two type parameters?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -635,6 +636,22 @@ trait ShuffleSpec {
    */
   def createPartitioning(clustering: Seq[Expression]): Partitioning =
     throw SparkUnsupportedOperationException()
+
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
+   * 'reducible' on the corresponding partition expression function of the other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition expression.
+   * A None value in the set indicates that the particular partition expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(spec: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = None

Review Comment:
   maybe this should belong to `KeyGroupedShuffleSpec`? I'm not sure whether this API is meaningful for other shuffle specs.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&

Review Comment:
   nit: seems unnecessary change



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -829,20 +846,59 @@ case class KeyGroupedShuffleSpec(
     }
   }
 
+  override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
+    // Only support partition expressions are AttributeReference for now
+    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
+  }
+
+  override def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_]]]] = {
+    other match {
+      case otherSpec: KeyGroupedShuffleSpec =>
+        val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
+          case (e1: TransformExpression, e2: TransformExpression)

Review Comment:
   maybe it's better to move this logic into `TransformExpression` itself, e.g., add a `TransformExpression.reducer` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org