You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/02/06 23:51:01 UTC

[GitHub] [hudi] alexeykudinkin opened a new pull request, #7872: [DNM] Cleaning up `Partitioner`s hierarchy

alexeykudinkin opened a new pull request, #7872:
URL: https://github.com/apache/hudi/pull/7872

   ### Change Logs
   
   > NOTE: This PR is a follow-up for #5328 and #7396
   
   This PR implements following changes:
   
    - Avoids unnecessary transformations (for ex, string concatenation to create sorting key)
    - Avoids unnecessary coalescing of the incoming RDD/Dataset in case target number of Spark partitions doesn't change
    - Cleans up code that not needed anymore
   
   ### Impact
   
   Improves partitioners performance by allowing to avoid unnecessary conversion, coalescing
   
   ### Risk level (write none, low medium or high below)
   
   Low
   
   ### Documentation Update
   
   N/A
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the FG containing N files into M (M>N) files (using `cluster`), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420345455

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * acc613f516996ade644c77c0c18397a8dc653ff3 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1106129776


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java:
##########
@@ -31,38 +29,15 @@
  * <p>

Review Comment:
   Good catch!



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:

Review Comment:
   This is also used for custom ordering by the users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105531548


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:

Review Comment:
   As far as I know, RowCustomColumnsSortPartitioner will only be used in `cluster`. At this time, the files in the same FG should already be in one physical partition.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java:
##########
@@ -31,38 +29,15 @@
  * <p>

Review Comment:
   `enforceNumOutputPartitions` has been deleted, maybe the expression here can be modified



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SparkBulkInsertPartitionerBase.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieUnsafeUtils$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession$;
+
+public abstract class SparkBulkInsertPartitionerBase<T> implements BulkInsertPartitioner<T> {
+
+  protected static Dataset<Row> tryCoalesce(Dataset<Row> dataset, int targetPartitionNumHint) {
+    // NOTE: In case incoming [[Dataset]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != HoodieUnsafeUtils$.MODULE$.getNumPartitions(dataset)) {
+      return dataset.coalesce(targetPartitionNumHint);
+    }
+
+    return dataset;
+  }
+
+  protected static <T> JavaRDD<HoodieRecord<T>> tryCoalesce(JavaRDD<HoodieRecord<T>> records,
+                                                            int targetPartitionNumHint) {
+    // NOTE: In case incoming [[RDD]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != records.getNumPartitions()) {

Review Comment:
   When `targetPartitionNumHint` > `records.getNumPartitions` coalesce is meaningless, maybe we can use `targetPartitionNumHint !< records.getNumPartitions()` as the condition
   
   In addition, I actually have a question, if `targetPartitionNumHint > records.getNumPartitions`, should we use `repartition` instead of `coalesce`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [DNM] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1419974437

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1423355371

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     }, {
       "hash" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15041",
       "triggerID" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * de96182a55e0574c96d2b384734a4808c8ba6399 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005) 
   * 20cba2df6bd792c5173b2ef7780ca093e2fac2b5 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15041) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420025187

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972) 
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * e41a12221335f304cba9c34b1d51fdf682d88608 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1423652742

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     }, {
       "hash" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15041",
       "triggerID" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * 20cba2df6bd792c5173b2ef7780ca093e2fac2b5 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15041) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1421759096

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * acc613f516996ade644c77c0c18397a8dc653ff3 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977) 
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * de96182a55e0574c96d2b384734a4808c8ba6399 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1423318544

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     }, {
       "hash" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "20cba2df6bd792c5173b2ef7780ca093e2fac2b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * de96182a55e0574c96d2b384734a4808c8ba6399 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005) 
   * 20cba2df6bd792c5173b2ef7780ca093e2fac2b5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105531548


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:

Review Comment:
   As far as I know, RowCustomColumnsSortPartitioner will only be used in `cluster`. At this time, the files in the same `clusteringGroup` should already be in one physical partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420030292

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972) 
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * e41a12221335f304cba9c34b1d51fdf682d88608 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the `clusteringGroup` containing N logical partitions into M (M>N) (using `cluster`), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the `clusteringGroup` containing N logical partitions into M (M>N) (using `cluster`), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1421533208

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * acc613f516996ade644c77c0c18397a8dc653ff3 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977) 
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420018306

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972) 
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420097466

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * e41a12221335f304cba9c34b1d51fdf682d88608 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973) 
   * acc613f516996ade644c77c0c18397a8dc653ff3 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1421739846

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * acc613f516996ade644c77c0c18397a8dc653ff3 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977) 
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * de96182a55e0574c96d2b384734a4808c8ba6399 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1421936664

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14977",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bb3bd527c1c20fb046c23cd4d34e218fb7a06f82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005",
       "triggerID" : "de96182a55e0574c96d2b384734a4808c8ba6399",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * bb3bd527c1c20fb046c23cd4d34e218fb7a06f82 UNKNOWN
   * de96182a55e0574c96d2b384734a4808c8ba6399 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15005) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420055816

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * e41a12221335f304cba9c34b1d51fdf682d88608 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540207


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the FG containing N files into M (M>N) files (using cluster), shuffle needs to happen anyway.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the FG containing N files into M (M>N) files (using cluster), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [DNM] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1419979711

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [DNM] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1419969188

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0adf87bfd552f023d926865f6854587ae0d48a6f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7872: [MINOR] Cleaning up `Partitioner`s hierarchy

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7872:
URL: https://github.com/apache/hudi/pull/7872#issuecomment-1420086693

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14972",
       "triggerID" : "0adf87bfd552f023d926865f6854587ae0d48a6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f42e8c18690c8ae76121c714c2c0cda21841264",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973",
       "triggerID" : "e41a12221335f304cba9c34b1d51fdf682d88608",
       "triggerType" : "PUSH"
     }, {
       "hash" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "acc613f516996ade644c77c0c18397a8dc653ff3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f42e8c18690c8ae76121c714c2c0cda21841264 UNKNOWN
   * e41a12221335f304cba9c34b1d51fdf682d88608 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14973) 
   * acc613f516996ade644c77c0c18397a8dc653ff3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the `clusteringGroup` containing N logical partitions into M (M>N) files (using `cluster`), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java:
##########
@@ -19,43 +19,70 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import scala.collection.JavaConverters;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames;
 
 /**
- * A partitioner that does sorting based on specified column values for each spark partitions.
+ * A partitioner that does local sorting for each RDD partition based on the tuple of
+ * values of the columns configured for ordering.
  */
-public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
+public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> {
 
-  private final String[] sortColumnNames;
+  private final String[] orderByColumnNames;
 
-  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = getOrderByColumnNames(config);
+
+    checkState(orderByColumnNames.length > 0);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) {
+    super(tableConfig);
+    this.orderByColumnNames = columnNames;
+
+    checkState(orderByColumnNames.length > 0);
   }
 
   @Override
-  public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns)
-        .coalesce(outputSparkPartitions);
+  public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) {
+    Dataset<Row> repartitionedDataset;
+
+    // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could
+    //       not change table's partitioning and therefore there's no point in doing global sorting
+    //       across "physical" partitions, and instead we can reduce total amount of data being
+    //       shuffled by doing do "local" sorting:
+    //          - First, re-partitioning dataset such that "logical" partitions are aligned w/
+    //          "physical" ones
+    //          - Sorting locally w/in RDD ("logical") partitions
+    //
+    //       Non-partitioned tables will be globally sorted.
+    if (isPartitionedTable) {
+      repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint),
+          new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+    } else {
+      repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint);

Review Comment:
   In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the `clusteringGroup` containing N partitions into M (M>N) files (using `cluster`), shuffle needs to happen anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105505809


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SparkBulkInsertPartitionerBase.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieUnsafeUtils$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession$;
+
+public abstract class SparkBulkInsertPartitionerBase<T> implements BulkInsertPartitioner<T> {
+
+  protected static Dataset<Row> tryCoalesce(Dataset<Row> dataset, int targetPartitionNumHint) {
+    // NOTE: In case incoming [[Dataset]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != HoodieUnsafeUtils$.MODULE$.getNumPartitions(dataset)) {
+      return dataset.coalesce(targetPartitionNumHint);
+    }
+
+    return dataset;
+  }
+
+  protected static <T> JavaRDD<HoodieRecord<T>> tryCoalesce(JavaRDD<HoodieRecord<T>> records,
+                                                            int targetPartitionNumHint) {
+    // NOTE: In case incoming [[RDD]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != records.getNumPartitions()) {

Review Comment:
   When `targetPartitionNumHint` > `records.getNumPartitions` `coalesce` is meaningless, maybe we can use `targetPartitionNumHint < records.getNumPartitions()` as the condition
   
   In addition, I actually have a question, if `targetPartitionNumHint > records.getNumPartitions`, should we use `repartition` instead of `coalesce`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Zouxxyy commented on a diff in pull request #7872: [HUDI-5716] Cleaning up `Partitioner`s hierarchy

Posted by "Zouxxyy (via GitHub)" <gi...@apache.org>.
Zouxxyy commented on code in PR #7872:
URL: https://github.com/apache/hudi/pull/7872#discussion_r1105505809


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SparkBulkInsertPartitionerBase.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieUnsafeUtils$;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession$;
+
+public abstract class SparkBulkInsertPartitionerBase<T> implements BulkInsertPartitioner<T> {
+
+  protected static Dataset<Row> tryCoalesce(Dataset<Row> dataset, int targetPartitionNumHint) {
+    // NOTE: In case incoming [[Dataset]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != HoodieUnsafeUtils$.MODULE$.getNumPartitions(dataset)) {
+      return dataset.coalesce(targetPartitionNumHint);
+    }
+
+    return dataset;
+  }
+
+  protected static <T> JavaRDD<HoodieRecord<T>> tryCoalesce(JavaRDD<HoodieRecord<T>> records,
+                                                            int targetPartitionNumHint) {
+    // NOTE: In case incoming [[RDD]]'s partition count matches the target one,
+    //       we short-circuit coalescing altogether (since this isn't done by Spark itself)
+    if (targetPartitionNumHint > 0 && targetPartitionNumHint != records.getNumPartitions()) {

Review Comment:
   When `targetPartitionNumHint` > `records.getNumPartitions` `coalesce` is meaningless, maybe we can use `targetPartitionNumHint !< records.getNumPartitions()` as the condition
   
   In addition, I actually have a question, if `targetPartitionNumHint > records.getNumPartitions`, should we use `repartition` instead of `coalesce`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org