You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/09/01 07:31:19 UTC

[GitHub] [spark] szehon-ho commented on a diff in pull request #42757: [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution

szehon-ho commented on code in PR #42757:
URL: https://github.com/apache/spark/pull/42757#discussion_r1312674140


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -137,81 +136,63 @@ case class BatchScanExec(
 
       outputPartitioning match {
         case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = groupPartitions(finalPartitions.map(_.head),
-              groupSplits = true).get
-
-            // This means the input partitions are not grouped by partition values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and replicate splits
-            // within a partition.
-            if (spjParams.commonPartitionValues.isDefined &&
-              spjParams.applyPartialClustering) {
-              // A mapping from the common partition values to how many splits the partition
-              // should contain.
-              val commonPartValuesMap = spjParams.commonPartitionValues
+          val groupedPartitions = filteredPartitions.map(splits => {
+            assert(splits.nonEmpty && splits.head.isInstanceOf[HasPartitionKey])
+            (splits.head.asInstanceOf[HasPartitionKey].partitionKey(), splits)
+          })
+
+          // This means the input partitions are not grouped by partition values. We'll need to

Review Comment:
   Nit: can we clarify 'this'  `When partially-clustered, input partitions are not grouped by partition values`
   
   Nit: groupByPartitionValues seems never actually defined, can we fix it?  Does it refer to groupedPartitions?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -327,11 +327,14 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
  * @param numPartitions the number of partitions
  * @param partitionValues the values for the cluster keys of the distribution, must be

Review Comment:
   What do you think to add 'final cluster keys' to the javadoc , to make it even more clear?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -217,3 +210,19 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
     }
   }
 }
+
+/**
+ * A key-grouped Spark partition, which could consist of multiple input splits
+ *
+ * @param value the partition value shared by all the input splits
+ * @param parts the input splits that are grouped into a single Spark partition
+ */
+private[v2] case class KeyGroupedPartition(value: InternalRow, parts: Seq[InputPartition])
+
+/**
+ * Information about key-grouped partitions, which contains a list of grouped partitions as well
+ * as the original input partitions before the grouping.
+ */
+private[v2] case class KeyGroupedPartitionInfo(

Review Comment:
   It seems like it would refer to info about one KeyGroupedPartition.  How about KeyGroupedPartitionInfos ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org