You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/02/11 02:33:27 UTC

[hudi] branch release-0.12.1 updated (2a3b0b5af8a -> 45bef56db55)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a change to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 2a3b0b5af8a add DeleteFsFileProcedure
     new d8043d48ea0 fix metatable hbase-site
     new ebf99a7e84f [HUDI-5278] Support more conf to cluster procedure (#7304)
     new 9a9fd985f2f improve BackupInvalidParquetProcedure
     new 0344765bdf3 [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
     new 3c4a403a0a3 [HUDI-5318] Fix partition pruning for clustering scheduling (#7366)
     new b0a0912cfaf [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)
     new 2c8586f5a3b fix enable metrics on
     new 5e2db20bb20 fix Zhiyan metrics reporter
     new cca17a599d6 [MINOR] improve RunClusteringProcedure with partition selected
     new 59094436c9d [HUDI-5343] HoodieFlinkStreamer supports async clustering for append mode (#7403)
     new 09df7305557 [HUDI-5515] Fix concurrency conflict in ClusteringOperator with latency marker (#7625)
     new 94ee9a133ad [HUDI-5543] Description of clustering.plan.partition.filter.mode supports DAY_ROLLING strategy (#7656)
     new 700717c7344 [HUDI-5235] Clustering target size should larger than small file limit (#7232)
     new ee779fe86fe [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)
     new e44285447a1 improve getCommitInstantsToArchive data duplication
     new c265ddcc4b5 [HUDI-5506] StreamWriteOperatorCoordinator may not recommit with partial uncommitted write metadata event (#7611)
     new 108c6afd308 [HUDI-4968] Update misleading read.streaming.skip_compaction/skip_clustering config (#6856)
     new 0f056e52e58 [HUDI-5286] UnsupportedOperationException throws when enabling filesystem retry (#7313)
     new 7588b918de2 add DropPartitionsProcedure
     new 45bef56db55 [HUDI-5495] add some property to table config

The 20 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .idea/vcs.xml                                      |   1 -
 .../apache/hudi/client/HoodieTimelineArchiver.java |  54 ++--
 .../apache/hudi/config/HoodieClusteringConfig.java |  56 +++-
 .../hudi/table/action/clean/CleanPlanner.java      |  19 +-
 .../PartitionAwareClusteringPlanStrategy.java      |  26 +-
 .../TestPartitionAwareClusteringPlanStrategy.java  |   2 +-
 .../SparkSizeBasedClusteringPlanStrategy.java      |  14 +-
 .../SparkSortAndSizeExecutionStrategy.java         |   6 +-
 .../TestSparkSizeBasedClusteringPlanStrategy.java  |  94 ++++++
 .../common/fs/HoodieRetryWrapperFileSystem.java    |   5 +
 .../hudi/common/table/HoodieTableMetaClient.java   |  19 ++
 .../apache/hudi/common/util/ClusteringUtils.java   |  37 +++
 hudi-common/src/main/resources/hbase-site.xml      |   2 +-
 .../fs/TestFSUtilsWithRetryWrapperEnable.java      |  14 +
 .../hudi/common/util/TestClusteringUtils.java      |  69 +++++
 .../apache/hudi/configuration/FlinkOptions.java    |  21 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   8 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |   5 +-
 .../hudi/sink/clustering/ClusteringOperator.java   |  63 ++--
 .../sink/clustering/FlinkClusteringConfig.java     |  37 ++-
 .../hudi/sink/compact/FlinkCompactionConfig.java   |  30 +-
 .../sink/partitioner/BucketIndexPartitioner.java   |   5 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  53 +++-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  21 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  57 ++--
 .../sink/TestStreamWriteOperatorCoordinator.java   |  16 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   4 +-
 .../procedures/BackupInvalidParquetProcedure.scala |  23 +-
 .../procedures/DropPartitionsProcedure.scala       | 118 +++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedures/RunClusteringProcedure.scala        | 168 +++++++---
 .../hudi/functional/TestLayoutOptimization.scala   |   3 +-
 .../TestBackupInvalidParquetProcedure.scala        |  19 +-
 .../hudi/procedure/TestClusteringProcedure.scala   | 339 ++++++++++++++++++++-
 .../procedure/TestDropPartitionsProcedure.scala    |  67 ++++
 35 files changed, 1263 insertions(+), 213 deletions(-)
 create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
 create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
 create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala


[hudi] 17/20: [HUDI-4968] Update misleading read.streaming.skip_compaction/skip_clustering config (#6856)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 108c6afd308421f79ead6fc2524704db6b33161e
Author: voonhous <vo...@gmail.com>
AuthorDate: Wed Feb 8 14:40:55 2023 +0800

    [HUDI-4968] Update misleading read.streaming.skip_compaction/skip_clustering config (#6856)
    
    (cherry picked from commit 0dbc3450a95ae84985ecfccee76afc9c2d64b536)
---
 .../org/apache/hudi/configuration/FlinkOptions.java     | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b885791daeb..e5f11444452 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -273,10 +275,10 @@ public class FlinkOptions extends HoodieConfig {
       .key("read.streaming.skip_compaction")
       .booleanType()
       .defaultValue(false)// default read as batch
-      .withDescription("Whether to skip compaction instants for streaming read,\n"
-          + "there are two cases that this option can be used to avoid reading duplicates:\n"
-          + "1) you are definitely sure that the consumer reads faster than any compaction instants, "
-          + "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
+      .withDescription("Whether to skip compaction instants and avoid reading compacted base files for streaming read to improve read performance.\n"
+          + "There are two cases that this option can be used to avoid reading duplicates:\n"
+          + "1) you are definitely sure that the consumer reads [faster than/completes before] any compaction instants "
+          + "when " + HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n"
           + "2) changelog mode is enabled, this option is a solution to keep data integrity");
 
   // this option is experimental
@@ -284,8 +286,11 @@ public class FlinkOptions extends HoodieConfig {
       .key("read.streaming.skip_clustering")
       .booleanType()
       .defaultValue(false)
-      .withDescription("Whether to skip clustering instants for streaming read,\n"
-          + "to avoid reading duplicates");
+      .withDescription("Whether to skip clustering instants to avoid reading base files of clustering operations for streaming read "
+          + "to improve read performance.\n"
+          + "This option toggled to true to avoid duplicates when: \n"
+          + "1) you are definitely sure that the consumer reads [faster than/completes before] any clustering instants "
+          + "when " + HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
 
   public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions


[hudi] 06/20: [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b0a0912cfaf869234453e89d0d0ccf60c8dde98b
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Jan 11 09:52:42 2023 +0800

    [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy (#7372)
    
    [HUDI-5326] Fix clustering group building in SparkSizeBasedClusteringPlanStrategy
    
    (cherry picked from commit b9c32fb943595605a1026d568f0c1d084a3751c3)
---
 .../SparkSizeBasedClusteringPlanStrategy.java      | 14 +++-
 .../SparkSortAndSizeExecutionStrategy.java         |  6 +-
 .../TestSparkSizeBasedClusteringPlanStrategy.java  | 94 ++++++++++++++++++++++
 .../hudi/functional/TestLayoutOptimization.scala   |  3 +-
 .../hudi/procedure/TestClusteringProcedure.scala   |  2 +
 5 files changed, 112 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 6629569d096..07e58c05fa4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -72,11 +72,19 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
 
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
     List<FileSlice> currentGroup = new ArrayList<>();
+
+    // Sort fileSlices before dividing, which makes dividing more compact
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
     long totalSizeSoFar = 0;
 
-    for (FileSlice currentSlice : fileSlices) {
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
       // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+      if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
         int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
         LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
             + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
@@ -88,7 +96,7 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
       // Add to the current file-group
       currentGroup.add(currentSlice);
       // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
+      totalSizeSoFar += currentSize;
     }
 
     if (!currentGroup.isEmpty()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 35c8f288bc8..e33b2ef7960 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -68,7 +68,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
         .withBulkInsertParallelism(numOutputGroups)
         .withProps(getWriteConfig().getProps()).build();
 
-    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
         getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
@@ -88,7 +88,9 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
     HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
         .withBulkInsertParallelism(numOutputGroups)
         .withProps(getWriteConfig().getProps()).build();
-    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+
+    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup()));
+
     return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
         newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 00000000000..99cafdceea4
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestSparkSizeBasedClusteringPlanStrategy {
+
+  @Mock
+  HoodieSparkCopyOnWriteTable table;
+  @Mock
+  HoodieSparkEngineContext context;
+
+  @Test
+  public void testBuildClusteringGroup() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringPlanStrategyClass(SparkSizeBasedClusteringPlanStrategy.class.getName())
+            .withClusteringMaxBytesInGroup(2000)
+            .withClusteringTargetFileMaxBytes(1000)
+            .withClusteringPlanSmallFileLimit(500)
+            .build())
+        .build();
+
+    SparkSizeBasedClusteringPlanStrategy planStrategy = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
+
+    ArrayList<FileSlice> fileSlices = new ArrayList<>();
+    fileSlices.add(createFileSlice(200));
+    fileSlices.add(createFileSlice(200));
+    fileSlices.add(createFileSlice(300));
+    fileSlices.add(createFileSlice(300));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+    fileSlices.add(createFileSlice(400));
+
+    Stream<HoodieClusteringGroup> clusteringGroupStream = planStrategy.buildClusteringGroupsForPartition("p0", fileSlices);
+    List<HoodieClusteringGroup> clusteringGroups = clusteringGroupStream.collect(Collectors.toList());
+
+    // FileSlices will be divided into two clusteringGroups
+    Assertions.assertEquals(2, clusteringGroups.size());
+
+    // First group: 400, 400, 400, 400, 300, and they will be merged into 2 files
+    Assertions.assertEquals(5, clusteringGroups.get(0).getSlices().size());
+    Assertions.assertEquals(2, clusteringGroups.get(0).getNumOutputFileGroups());
+
+    // Second group: 300, 200, 200, and they will be merged into 1 file
+    Assertions.assertEquals(3, clusteringGroups.get(1).getSlices().size());
+    Assertions.assertEquals(1, clusteringGroups.get(1).getNumOutputFileGroups());
+  }
+
+  private FileSlice createFileSlice(long baseFileSize) {
+    String fileId = FSUtils.createNewFileId(FSUtils.createNewFileIdPfx(), 0);
+    FileSlice fs = new FileSlice("p0", "001", fileId);
+    HoodieBaseFile f = new HoodieBaseFile(fileId);
+    f.setFileLen(baseFileSize);
+    fs.setBaseFile(f);
+    return fs;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
index 7d54959d2ea..b87c813a0fa 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
@@ -111,8 +111,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
       .option("hoodie.clustering.inline.max.commits", "1")
       .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
       .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
-      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
-      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
+      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", "2147483648")
       .option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), clusteringAsRow)
       .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), layoutOptimizationStrategy)
       .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.key(), curveCompositionStrategy)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index c10c56f3350..c9fbb0f5d9f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -572,6 +572,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
            | hoodie.parquet.max.file.size=${avgSize * avgCount},
            | hoodie.parquet.small.file.limit=0,
            | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
            | hoodie.metadata.enable=true,
            | hoodie.metadata.index.column.stats.enable=true
            |")""".stripMargin)
@@ -588,6 +589,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
            | hoodie.parquet.max.file.size=${avgSize * avgCount},
            | hoodie.parquet.small.file.limit=0,
            | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.clustering.plan.strategy.max.bytes.per.group=${4 * avgSize * avgCount},
            | hoodie.metadata.enable=true,
            | hoodie.metadata.index.column.stats.enable=true
            |")""".stripMargin)


[hudi] 02/20: [HUDI-5278] Support more conf to cluster procedure (#7304)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ebf99a7e84f2c937a968a5a51f48945919c07015
Author: KnightChess <98...@qq.com>
AuthorDate: Wed Nov 30 09:02:07 2022 +0800

    [HUDI-5278] Support more conf to cluster procedure (#7304)
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  52 ++++
 .../hudi/config/metrics/HoodieMetricsConfig.java   |   2 +-
 .../procedures/RunClusteringProcedure.scala        | 143 ++++++++---
 .../hudi/procedure/TestClusteringProcedure.scala   | 269 ++++++++++++++++++++-
 4 files changed, 432 insertions(+), 34 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 1180845a6ed..8db88178f8d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -680,5 +680,57 @@ public class HoodieClusteringConfig extends HoodieConfig {
 
       return enumValue;
     }
+
+    public String getValue() {
+      return value;
+    }
+  }
+
+  public enum ClusteringOperator {
+
+    /**
+     * only schedule the clustering plan
+     */
+    SCHEDULE("schedule"),
+
+    /**
+     * only execute then pending clustering plans
+     */
+    EXECUTE("execute"),
+
+    /**
+     * schedule cluster first, and execute all pending clustering plans
+     */
+    SCHEDULE_AND_EXECUTE("scheduleandexecute");
+
+    private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
+            TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> e.value);
+
+    private final String value;
+
+    ClusteringOperator(String value) {
+      this.value = value;
+    }
+
+    @Nonnull
+    public static ClusteringOperator fromValue(String value) {
+      ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
+      if (enumValue == null) {
+        throw new HoodieException(String.format("Invalid value (%s)", value));
+      }
+      return enumValue;
+    }
+
+    public boolean isSchedule() {
+      return this != ClusteringOperator.EXECUTE;
+    }
+
+    public boolean isExecute() {
+      return this != ClusteringOperator.SCHEDULE;
+    }
+
+    public String getValue() {
+      return value;
+    }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 787819be120..050b0519ee6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,7 +47,7 @@ public class HoodieMetricsConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
       .key(METRIC_PREFIX + ".on")
-      .defaultValue(true)
+      .defaultValue(false)
       .sinceVersion("0.5.0")
       .withDocumentation("Turn on/off metrics reporting. off by default.");
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 18ea636c057..d34c0b0d7b7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
+import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
 import org.apache.hudi.config.HoodieClusteringConfig
+import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, LayoutOptimizationStrategy}
 import org.apache.hudi.exception.HoodieClusteringException
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
 import org.apache.spark.internal.Logging
@@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.execution.datasources.FileStatusCache
 import org.apache.spark.sql.types._
 
+import java.util.Locale
 import java.util.function.Supplier
 import scala.collection.JavaConverters._
 
@@ -49,7 +52,12 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
     ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
     ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
-    ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
+    ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
+    ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
+    // params => key=value, key2=value2
+    ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
+    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -71,6 +79,10 @@ class RunClusteringProcedure extends BaseProcedure
     val predicate = getArgValueOrDefault(args, PARAMETERS(2))
     val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
     val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
+    val op = getArgValueOrDefault(args, PARAMETERS(5))
+    val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
+    val options = getArgValueOrDefault(args, PARAMETERS(7))
+    val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
@@ -99,41 +111,98 @@ class RunClusteringProcedure extends BaseProcedure
         logInfo("No order columns")
     }
 
+    orderStrategy match {
+      case Some(o) =>
+        val strategy = LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
+        conf = conf ++ Map(
+          HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> strategy.getValue
+        )
+      case _ =>
+        logInfo("No order strategy")
+    }
+
+    options match {
+      case Some(p) =>
+        val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
+        paramPairs.foreach{ pair =>
+          val values = StringUtils.split(pair, "=")
+          conf = conf ++ Map(values.get(0) -> values.get(1))
+        }
+      case _ =>
+        logInfo("No options")
+    }
+
     // Get all pending clustering instants
     var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
       .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
-    logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
 
-    val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
-    val instantTime = HoodieActiveTimeline.createNewInstantTime
-    if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
-      pendingClustering ++= Seq(instantTime)
+    var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
+    pendingClustering = instantsStr match {
+      case Some(inst) =>
+        op match {
+          case Some(o) =>
+            if (!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
+              throw new HoodieClusteringException("specific instants only can be used in 'execute' op or not specific op")
+            }
+          case _ =>
+            logInfo("No op and set it to EXECUTE with instants specified.")
+        }
+        operator = ClusteringOperator.EXECUTE
+        checkAndFilterPendingInstants(pendingClustering, inst.asInstanceOf[String])
+      case _ =>
+        logInfo("No specific instants")
+        op match {
+          case Some(o) =>
+            operator = ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
+          case _ =>
+            logInfo("No op, use default scheduleAndExecute")
+        }
+        pendingClustering
     }
-    logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
-
-    val startTs = System.currentTimeMillis()
-    pendingClustering.foreach(client.cluster(_, true))
-    logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
-      s" time cost: ${System.currentTimeMillis() - startTs}ms.")
-
-    val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
-      .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
-      .toSeq
-      .sortBy(f => f.getTimestamp)
-      .reverse
-
-    val clusteringPlans = clusteringInstants.map(instant =>
-      ClusteringUtils.getClusteringPlan(metaClient, instant)
-    )
-
-    if (showInvolvedPartitions) {
-      clusteringPlans.map { p =>
-        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
-          p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+
+    logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
+      if (operator.isSchedule) {
+        val instantTime = HoodieActiveTimeline.createNewInstantTime
+        if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
+          pendingClustering ++= Seq(instantTime)
+        }
+      }
+      logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")
+
+      if (operator.isExecute) {
+        val startTs = System.currentTimeMillis()
+        pendingClustering.foreach(client.cluster(_, true))
+        logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
+          s" time cost: ${System.currentTimeMillis() - startTs}ms.")
+      }
+
+      val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
+        .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
+        .toSeq
+        .sortBy(f => f.getTimestamp)
+        .reverse
+
+      val clusteringPlans = clusteringInstants.map(instant =>
+        ClusteringUtils.getClusteringPlan(metaClient, instant)
+      )
+
+      if (showInvolvedPartitions) {
+        clusteringPlans.map { p =>
+          Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
+            p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
+        }
+      } else {
+        clusteringPlans.map { p =>
+          Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+        }
       }
-    } else {
-      clusteringPlans.map { p =>
-        Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
+    } finally {
+      if (client != null) {
+        client.close()
       }
     }
   }
@@ -174,6 +243,16 @@ class RunClusteringProcedure extends BaseProcedure
     })
   }
 
+  private def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: String): Seq[String] = {
+    val instants = StringUtils.split(instantStr, ",").asScala
+    val pendingSet = pendingInstants.toSet
+    val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
+    if (noneInstants.nonEmpty) {
+      throw new HoodieClusteringException(s"specific ${noneInstants.mkString(",")} instants is not exist")
+    }
+    instants.sortBy(f => f)
+  }
+
 }
 
 object RunClusteringProcedure {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 456f9c5066a..e488811c0db 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -19,11 +19,21 @@
 
 package org.apache.spark.sql.hudi.procedure
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType}
+import org.apache.spark.sql.{Dataset, Row}
 
+import java.util
 import scala.collection.JavaConverters.asScalaIteratorConverter
 
 class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
@@ -385,4 +395,261 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
       }
     }
   }
+
+  test("Test Call run_clustering Procedure with specific instants") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      val instants = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+      assert(2 == instants.size)
+
+      checkExceptionContain(
+        s"call run_clustering(table => '$tableName', instants => '000000, ${instants.head}')"
+      )("specific 000000 instants is not exist")
+      metaClient.reloadActiveTimeline()
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(2 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false"))
+      // specific instants will not schedule new cluster plan
+      spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants.mkString(",")}')")
+      metaClient.reloadActiveTimeline()
+      assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+
+      // test with operator schedule
+      checkExceptionContain(
+      s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
+      )("specific instants only can be used in 'execute' op or not specific op")
+
+      // test with operator scheduleAndExecute
+      checkExceptionContain(
+        s"call run_clustering(table => '$tableName', instants => '000000', op => 'scheduleAndExecute')"
+      )("specific instants only can be used in 'execute' op or not specific op")
+
+      // test with operator execute
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+      metaClient.reloadActiveTimeline()
+      val instants2 = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq
+      spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants2.mkString(",")}', op => 'execute')")
+      metaClient.reloadActiveTimeline()
+      assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.count())
+    }
+  }
+
+  test("Test Call run_clustering Procedure op") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate"-> "false"))
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty())
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')")
+      metaClient.reloadActiveTimeline()
+      assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(1 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+      metaClient.reloadActiveTimeline()
+      assert(1 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName')")
+      metaClient.reloadActiveTimeline()
+      assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      spark.sql(s"call run_clustering(table => '$tableName')")
+      metaClient.reloadActiveTimeline()
+      assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.count())
+      assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.count())
+
+      checkExceptionContain(s"call run_clustering(table => '$tableName', op => 'null')")("Invalid value")
+    }
+  }
+
+  test("Test Call run_clustering Procedure Order Strategy") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      val metadataOpts = Map(
+        HoodieMetadataConfig.ENABLE.key -> "true",
+        HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" ,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key() -> "true"
+      )
+
+      val queryOpts = metadataOpts ++ Map(
+        "path" -> basePath,
+        DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
+      )
+
+      val dataFilterC2 = EqualTo(AttributeReference("c2", StringType, nullable = false)(), Literal("foo23"))
+      val dataFilterC3 = EqualTo(AttributeReference("c3", StringType, nullable = false)(), Literal("bar23"))
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 string,
+           |  c3 double
+           |) using hudi
+           | options (
+           |  primaryKey = 'c1',
+           |  type = 'cow',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.index.column.stats.enable = 'true',
+           |  hoodie.enable.data.skipping = 'true',
+           |  hoodie.datasource.write.operation = 'insert'
+           | )
+           | location '$basePath'
+     """.stripMargin)
+
+      val fileNum = 20
+      val numRecords = 400000
+
+      // insert records
+      writeRecords(fileNum, numRecords, 0, basePath,  metadataOpts ++ Map("hoodie.avro.schema.validate"-> "false"))
+      val conf = new Configuration
+      val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build
+      val avgSize = avgRecord(metaClient.getActiveTimeline)
+      val avgCount = Math.ceil(1.0 * numRecords / fileNum).toLong
+
+      spark.sql(
+        s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'linear', options => "
+           | hoodie.copyonwrite.record.size.estimate=$avgSize,
+           | hoodie.parquet.max.file.size=${avgSize * avgCount},
+           | hoodie.parquet.small.file.limit=0,
+           | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.metadata.enable=true,
+           | hoodie.metadata.index.column.stats.enable=true
+           |")""".stripMargin)
+
+      metaClient.reloadActiveTimeline()
+      val fileIndex1 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+      val orderAllFiles = fileIndex1.allFiles.size
+      val c2OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+      val c3OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+      spark.sql(
+        s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'z-order', options => "
+           | hoodie.copyonwrite.record.size.estimate=$avgSize,
+           | hoodie.parquet.max.file.size=${avgSize * avgCount},
+           | hoodie.parquet.small.file.limit=0,
+           | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount},
+           | hoodie.metadata.enable=true,
+           | hoodie.metadata.index.column.stats.enable=true
+           |")""".stripMargin)
+
+      metaClient.reloadActiveTimeline()
+      val fileIndex2 = HoodieFileIndex(spark, metaClient, None, queryOpts)
+      val ZOrderAllFiles = fileIndex2.allFiles.size
+      val c2ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC2)).head.files.size
+      val c3ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC3)).head.files.size
+
+      assert((1.0 * c2OrderFilterCount / orderAllFiles) < (1.0 * c2ZOrderFilterCount / ZOrderAllFiles))
+      assert((1.0 * c3OrderFilterCount / orderAllFiles) > (1.0 * c3ZOrderFilterCount / ZOrderAllFiles))
+    }
+  }
+
+  def avgRecord(commitTimeline: HoodieTimeline): Long = {
+    var totalByteSize = 0L
+    var totalRecordsCount = 0L
+    commitTimeline.getReverseOrderedInstants.toArray.foreach(instant => {
+      val commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant.asInstanceOf[HoodieInstant]).get, classOf[HoodieCommitMetadata])
+      totalByteSize = totalByteSize + commitMetadata.fetchTotalBytesWritten()
+      totalRecordsCount = totalRecordsCount + commitMetadata.fetchTotalRecordsWritten()
+    })
+
+    Math.ceil((1.0 * totalByteSize) / totalRecordsCount).toLong
+  }
+
+  def writeRecords(files: Int, numRecords: Int, partitions: Int, location: String, options: Map[String, String]): Unit = {
+    val records = new util.ArrayList[Row](numRecords)
+    val rowDimension = Math.ceil(Math.sqrt(numRecords)).toInt
+
+    val data = Stream.range(0, rowDimension, 1)
+      .flatMap(x => Stream.range(0, rowDimension, 1).map(y => Pair.of(x, y)))
+
+    if (partitions > 0) {
+      data.foreach { i =>
+        records.add(Row(i.getLeft % partitions, "foo" + i.getLeft, "bar" + i.getRight))
+      }
+    } else {
+      data.foreach { i =>
+        records.add(Row(i.getLeft, "foo" + i.getLeft, "bar" + i.getRight))
+      }
+    }
+
+    val struct = StructType(Array[StructField](
+      StructField("c1", DataTypes.IntegerType, nullable = true, Metadata.empty),
+      StructField("c2", DataTypes.StringType, nullable = true, Metadata.empty),
+      StructField("c3", DataTypes.StringType, nullable = true, Metadata.empty)
+    ))
+
+    // files can not effect for hudi
+    val df = spark.createDataFrame(records, struct).repartition(files)
+    writeDF(df, location, options)
+  }
+
+  def writeDF(df: Dataset[Row], location: String, options: Map[String, String]): Unit = {
+    df.select("c1", "c2", "c3")
+      .sortWithinPartitions("c1", "c2")
+      .write
+      .format("hudi")
+      .option(OPERATION.key(), WriteOperationType.INSERT.value())
+      .option(RECORDKEY_FIELD.key(), "c1")
+      .options(options)
+      .mode("append").save(location)
+  }
 }


[hudi] 04/20: [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0344765bdf3a65a4fe2ce37a2d5ec3f5b3561429
Author: luokey <85...@qq.com>
AuthorDate: Fri Feb 3 00:56:07 2023 -0500

    [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
    
    (cherry picked from commit 3282caa22420f3012698ec8cce376ad986034300)
---
 .../java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java  | 5 +++--
 .../org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java     | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 7af12487587..72f99422a8e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -135,8 +135,9 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    * (partition + curBucket) % numPartitions == this taskID belongs to this task.
    */
   public boolean isBucketToLoad(int bucketNumber, String partition) {
-    int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, parallelism) == taskID;
+    final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism;
+    int globalIndex = partitionIndex + bucketNumber;
+    return BucketIdentifier.mod(globalIndex, parallelism)  == taskID;
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 5fa3d1ab9a0..4e0c08b1046 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -42,7 +42,8 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
   @Override
   public int partition(HoodieKey key, int numPartitions) {
     int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
-    int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, numPartitions);
+    int partitionIndex = (key.getPartitionPath().hashCode() & Integer.MAX_VALUE) % numPartitions;
+    int globalIndex = partitionIndex + curBucket;
+    return BucketIdentifier.mod(globalIndex, numPartitions);
   }
 }


[hudi] 05/20: [HUDI-5318] Fix partition pruning for clustering scheduling (#7366)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3c4a403a0a3fdb51027c0f2f76106322132513ff
Author: StreamingFlames <18...@163.com>
AuthorDate: Wed Dec 14 09:10:50 2022 +0800

    [HUDI-5318] Fix partition pruning for clustering scheduling (#7366)
    
    Co-authored-by: Nicholas Jiang <pr...@163.com>
    (cherry picked from commit 6de923cfdfdfcc4d265e3af5e12749295c29bb1c)
---
 .../PartitionAwareClusteringPlanStrategy.java      | 24 ++++----
 .../TestPartitionAwareClusteringPlanStrategy.java  |  2 +-
 .../hudi/procedure/TestClusteringProcedure.scala   | 66 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 14 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 5d62ef39023..8aafa6d28c4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -71,11 +71,18 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
     HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
     LOG.info("Scheduling clustering for " + metaClient.getBasePath());
     HoodieWriteConfig config = getWriteConfig();
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath());
 
-    // get matched partitions if set
-    partitionPaths = getMatchedPartitions(config, partitionPaths);
-    // filter the partition paths if needed to reduce list status
+    String partitionSelected = config.getClusteringPartitionSelected();
+    List<String> partitionPaths;
+
+    if (StringUtils.isNullOrEmpty(partitionSelected)) {
+      // get matched partitions if set
+      partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));
+      // filter the partition paths if needed to reduce list status
+    } else {
+      partitionPaths = Arrays.asList(partitionSelected.split(","));
+    }
+
     partitionPaths = filterPartitionPaths(partitionPaths);
 
     if (partitionPaths.isEmpty()) {
@@ -114,15 +121,6 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
         .build());
   }
 
-  public List<String> getMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) {
-    String partitionSelected = config.getClusteringPartitionSelected();
-    if (!StringUtils.isNullOrEmpty(partitionSelected)) {
-      return Arrays.asList(partitionSelected.split(","));
-    } else {
-      return getRegexPatternMatchedPartitions(config, partitionPaths);
-    }
-  }
-
   public List<String> getRegexPatternMatchedPartitions(HoodieWriteConfig config, List<String> partitionPaths) {
     String pattern = config.getClusteringPartitionFilterRegexPattern();
     if (!StringUtils.isNullOrEmpty(pattern)) {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 440bc956153..a053a961105 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -71,7 +71,7 @@ public class TestPartitionAwareClusteringPlanStrategy {
     fakeTimeBasedPartitionsPath.add("20210719");
     fakeTimeBasedPartitionsPath.add("20210721");
 
-    List list = strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
+    List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
     assertEquals(2, list.size());
     assertTrue(list.contains("20210721"));
     assertTrue(list.contains("20210723"));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index e488811c0db..c10c56f3350 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.config.HoodieClusteringConfig
 import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
 import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType}
@@ -602,6 +603,71 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call run_clustering with partition selected config") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+     """.stripMargin)
+
+      // Test clustering with PARTITION_SELECTED config set, choose only a part of all partitions to schedule
+      {
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
+        // Do
+        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+        assertResult(1)(result.length)
+        assertResult("ts=1010")(result(0)(3))
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010),
+          Seq(2, "a2", 10.0, 1010),
+          Seq(3, "a3", 10.0, 1011)
+        )
+      }
+
+      // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule
+      {
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
+        assertResult(1)(result.length)
+        assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010),
+          Seq(2, "a2", 10.0, 1010),
+          Seq(3, "a3", 10.0, 1011),
+          Seq(4, "a4", 10.0, 1010),
+          Seq(5, "a5", 10.0, 1011),
+          Seq(6, "a6", 10.0, 1012)
+        )
+      }
+    }
+  }
+
   def avgRecord(commitTimeline: HoodieTimeline): Long = {
     var totalByteSize = 0L
     var totalRecordsCount = 0L


[hudi] 15/20: improve getCommitInstantsToArchive data duplication

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e44285447a1a7eb6f3b3d49e00d653cb910b19d0
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Tue Feb 7 22:16:09 2023 +0800

    improve getCommitInstantsToArchive data duplication
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 60 +++++++++++++---------
 1 file changed, 35 insertions(+), 25 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 974440f165c..0f815681722 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -398,7 +398,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
         }).flatMap(Collection::stream);
   }
 
-  private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+  private Stream<HoodieInstant> getCommitInstantsToArchive() {
     // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
     // with logic above to avoid Stream.concat
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -413,11 +413,9 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
             .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
             .filterInflights().firstInstant();
 
-    // NOTE: We cannot have any holes in the commit timeline.
-    // We cannot archive any commits which are made after the first savepoint present,
-    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+    // We cannot have any holes in the commit timeline. We cannot archive any commits which are
+    // made after the first savepoint present.
     Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
-    Set<String> savepointTimestamps = table.getSavepointTimestamps();
     if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
       // For Merge-On-Read table, inline or async compaction is enabled
       // We need to make sure that there are enough delta commits in the active timeline
@@ -431,45 +429,57 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
               : Option.empty();
 
-      // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
-      // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
-      Option<HoodieInstant> oldestInstantToRetainForClustering =
-          ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());
-
       // Actually do the commits
       Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
           .filter(s -> {
-            if (config.shouldArchiveBeyondSavepoint()) {
-              // skip savepoint commits and proceed further
-              return !savepointTimestamps.contains(s.getTimestamp());
-            } else {
-              // if no savepoint present, then don't filter
-              // stop at first savepoint commit
-              return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
-            }
+            // if no savepoint present, then don't filter
+            return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
           }).filter(s -> {
-            // Ensure commits >= the oldest pending compaction commit is retained
+            // Ensure commits >= oldest pending compaction/replace commit is retained
             return oldestPendingCompactionAndReplaceInstant
-                .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
+                .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
                 .orElse(true);
           }).filter(s -> {
             // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
             // get archived, i.e, instants after the oldestInflight are retained on the timeline
             if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
               return oldestInflightCommitInstant.map(instant ->
-                      compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
+                      HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
                   .orElse(true);
             }
             return true;
           }).filter(s ->
               oldestInstantToRetainForCompaction.map(instantToRetain ->
-                      compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
-                  .orElse(true)
-          ).filter(s ->
-              oldestInstantToRetainForClustering.map(instantToRetain ->
                       HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
           );
+
+      // When inline or async clustering is enabled, we need to ensure that there is a commit in the active timeline
+      // to check whether the file slice generated in pending clustering after archive isn't committed
+      // via {@code HoodieFileGroup#isFileSliceCommitted(slice)}
+      boolean isOldestPendingReplaceInstant =
+          oldestPendingCompactionAndReplaceInstant.map(instant ->
+              HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())).orElse(false);
+      if (isOldestPendingReplaceInstant) {
+        List<HoodieInstant> instantsToArchive = instantToArchiveStream.collect(Collectors.toList());
+        Option<HoodieInstant> latestInstantRetainForReplace = Option.fromJavaOptional(
+            instantsToArchive.stream()
+                .filter(s -> HoodieTimeline.compareTimestamps(
+                    s.getTimestamp(),
+                    LESSER_THAN,
+                    oldestPendingCompactionAndReplaceInstant.get().getTimestamp()))
+                .reduce((i1, i2) -> i2));
+        if (latestInstantRetainForReplace.isPresent()) {
+          LOG.info(String.format(
+              "Retaining the archived instant %s before the inflight replacecommit %s.",
+              latestInstantRetainForReplace.get().getTimestamp(),
+              oldestPendingCompactionAndReplaceInstant.get().getTimestamp()));
+        }
+        instantToArchiveStream = instantsToArchive.stream()
+            .filter(s -> latestInstantRetainForReplace.map(instant -> s.compareTo(instant) != 0)
+                .orElse(true));
+      }
+
       return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
     } else {
       return Stream.empty();


[hudi] 03/20: improve BackupInvalidParquetProcedure

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9a9fd985f2f7d251af848b7034c5b4b1bc0ac8c9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sat Feb 4 15:03:35 2023 +0800

    improve BackupInvalidParquetProcedure
---
 .../procedures/BackupInvalidParquetProcedure.scala | 23 +++++++++++++++++-----
 .../TestBackupInvalidParquetProcedure.scala        | 19 +++++++++++-------
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index 5c1234b7a27..f7b50bdc3d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -31,10 +31,14 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
 
 import java.util.function.Supplier
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
 
 class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "is_partition", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(2, "parallelism", DataTypes.IntegerType, 100)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -50,17 +54,26 @@ class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder
     super.checkArgs(PARAMETERS, args)
 
     val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val isPartition = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
+    val parallelism = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+
     val backupPath = new Path(srcPath, ".backup").toString
     val fs = FSUtils.getFs(backupPath, jsc.hadoopConfiguration())
     fs.mkdirs(new Path(backupPath))
 
-    val partitionPaths: java.util.List[String] = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
-    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPaths.size())
     val serHadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration())
+    val partitionPaths: java.util.List[Path] = if (isPartition) {
+      List(new Path(srcPath)).asJava
+    } else {
+      FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, false)
+        .map(part => FSUtils.getPartitionPath(srcPath, part))
+        .toList.asJava
+    }
+    val javaRdd: JavaRDD[Path] = jsc.parallelize(partitionPaths, partitionPaths.size())
     val invalidParquetCount = javaRdd.rdd.map(part => {
       val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
-      FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, part))
-    }).flatMap(_.toList)
+      FSUtils.getAllDataFilesInPartition(fs, part)
+    }).flatMap(_.toList).repartition(parallelism)
       .filter(status => {
         val filePath = status.getPath
         var isInvalid = false
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
index 2e54f40fb3f..e97574afc85 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBackupInvalidParquetProcedure.scala
@@ -55,17 +55,23 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
       out1.write(1)
       out1.close()
 
-      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
-      val out2 = fs.create(invalidPath2)
-      out2.write(1)
-      out2.close()
+      assertResult(1) {
+        spark.sql(
+          s"""call show_invalid_parquet(path => '$basePath')""".stripMargin)
+          .collect().length
+      }
 
       val result1 = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect()
-      assertResult(2) {
+        s"""call backup_invalid_parquet(path => '$basePath/ts=1500', is_partition => true)""".stripMargin).collect()
+      assertResult(1) {
         result1.length
       }
 
+      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+      val out2 = fs.create(invalidPath2)
+      out2.write(1)
+      out2.close()
+
       val result2 = spark.sql(
         s"""call backup_invalid_parquet(path => '$basePath')""".stripMargin).collect()
       assertResult(1) {
@@ -77,7 +83,6 @@ class TestBackupInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
       assertResult(0) {
         result3.length
       }
-
     }
   }
 }


[hudi] 08/20: fix Zhiyan metrics reporter

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5e2db20bb20537754408bbf690caf25bbf587cd9
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Mon Feb 6 17:15:58 2023 +0800

    fix Zhiyan metrics reporter
---
 .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala         | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7123eda3f44..286989944b7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -93,7 +93,7 @@ object HoodieSparkSqlWriter {
       originKeyGeneratorClassName, parameters)
     //validate datasource and tableconfig keygen are the same
     validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
-    val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
+    val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "default")
     val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
     assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
@@ -186,7 +186,6 @@ object HoodieSparkSqlWriter {
       // scalastyle:off
       if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
         operation == WriteOperationType.BULK_INSERT) {
-        parameters.put(HoodieWriteConfig.DATABASE_NAME.key(), databaseName)
         val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
           basePath, path, instantTime, partitionColumns)
         return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -568,7 +567,6 @@ object HoodieSparkSqlWriter {
     }
     val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
     params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
-    val dbName = parameters.getOrElse(HoodieWriteConfig.DATABASE_NAME.key(), "default")
     val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
     val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
       val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)


[hudi] 10/20: [HUDI-5343] HoodieFlinkStreamer supports async clustering for append mode (#7403)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 59094436c9d6fae5b729a38e1d192993446ed504
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Dec 28 20:14:51 2022 +0800

    [HUDI-5343] HoodieFlinkStreamer supports async clustering for append mode (#7403)
    
    (cherry picked from commit f2b2ec9539d97bae3952c00085cdfc6c786e9239)
---
 .../sink/clustering/FlinkClusteringConfig.java     | 37 +++++++--------
 .../hudi/sink/compact/FlinkCompactionConfig.java   | 30 ++++++------
 .../apache/hudi/streamer/FlinkStreamerConfig.java  | 53 ++++++++++++++++++++--
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  | 21 +++++++--
 4 files changed, 95 insertions(+), 46 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
index 3bbae38e00e..739cb29052a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -54,70 +54,67 @@ public class FlinkClusteringConfig extends Configuration {
   // ------------------------------------------------------------------------
   //  Clustering Options
   // ------------------------------------------------------------------------
-  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false)
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 1 commit")
   public Integer clusteringDeltaCommits = 1;
 
-  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false)
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1")
   public Integer clusteringTasks = -1;
 
-  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.")
   public Integer compactionMaxMemory = 100;
 
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
-          + "This also directly translates into how much you can incrementally pull on this table, default 10",
-      required = false)
+          + "This also directly translates into how much you can incrementally pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
   @Parameter(names = {"--archive-min-commits"},
-      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
-      required = false)
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
 
   @Parameter(names = {"--archive-max-commits"},
-      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
-      required = false)
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.")
   public Integer archiveMaxCommits = 30;
 
   @Parameter(names = {"--schedule", "-sc"}, description = "Schedule the clustering plan in this job.\n"
-      + "Default is false", required = false)
+      + "Default is false")
   public Boolean schedule = false;
 
   @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time")
   public String clusteringInstantTime = null;
 
-  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default", required = false)
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, disabled by default")
   public Boolean cleanAsyncEnable = false;
 
-  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan")
   public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
 
-  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false)
+  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan")
   public String planPartitionFilterMode = "NONE";
 
-  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB")
   public Long targetFileMaxBytes = 1024 * 1024 * 1024L;
 
-  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB")
   public Long smallFileLimit = 600L;
 
-  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0")
   public Integer skipFromLatestPartitions = 0;
 
-  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false)
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.")
   public String sortColumns = "";
 
-  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false)
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30")
   public Integer maxNumGroups = 30;
 
-  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false)
+  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2")
   public Integer targetPartitions = 2;
 
   public static final String SEQ_FIFO = "FIFO";
   public static final String SEQ_LIFO = "LIFO";
   @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
       + "1). FIFO: execute the oldest plan first;\n"
-      + "2). LIFO: execute the latest plan first, by default FIFO", required = false)
+      + "2). LIFO: execute the latest plan first, by default FIFO")
   public String clusteringSeq = SEQ_FIFO;
 
   @Parameter(names = {"--service"}, description = "Flink Clustering runs in service mode, disable by default")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 449b0684615..0308d246333 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -53,55 +53,51 @@ public class FlinkCompactionConfig extends Configuration {
           + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
           + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
           + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
-          + "Default is 'num_commits'",
-      required = false)
+          + "Default is 'num_commits'")
   public String compactionTriggerStrategy = NUM_COMMITS;
 
-  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits", required = false)
+  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 1 commit")
   public Integer compactionDeltaCommits = 1;
 
-  @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour", required = false)
+  @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
   public Integer compactionDeltaSeconds = 3600;
 
-  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
   public Boolean cleanAsyncEnable = false;
 
   @Parameter(names = {"--clean-retain-commits"},
       description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
-          + "This also directly translates into how much you can incrementally pull on this table, default 10",
-      required = false)
+          + "This also directly translates into how much you can incrementally pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
   @Parameter(names = {"--archive-min-commits"},
-      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
-      required = false)
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.")
   public Integer archiveMinCommits = 20;
 
   @Parameter(names = {"--archive-max-commits"},
-      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
-      required = false)
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.")
   public Integer archiveMaxCommits = 30;
 
-  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.")
   public Integer compactionMaxMemory = 100;
 
-  @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false)
+  @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.")
   public Long compactionTargetIo = 512000L;
 
-  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false)
+  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1")
   public Integer compactionTasks = -1;
 
   @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the compaction plan in this job.\n"
       + "There is a risk of losing data when scheduling compaction outside the writer job.\n"
       + "Scheduling compaction in the writer job and only let this job do the compaction execution is recommended.\n"
-      + "Default is false", required = false)
+      + "Default is false")
   public Boolean schedule = false;
 
   public static final String SEQ_FIFO = "FIFO";
   public static final String SEQ_LIFO = "LIFO";
   @Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
       + "1). FIFO: execute the oldest plan first;\n"
-      + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
+      + "2). LIFO: execute the latest plan first, by default LIFO")
   public String compactionSeq = SEQ_FIFO;
 
   @Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
@@ -126,7 +122,7 @@ public class FlinkCompactionConfig extends Configuration {
       + "It's only effective for 'instants' plan selection strategy.")
   public String compactionPlanInstant;
 
-  @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
+  @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.")
   public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index b2f72aed7da..5df71c64221 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.streamer;
 
+import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -239,8 +240,8 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--compaction-async-enabled"}, description = "Async Compaction, enabled by default for MOR")
   public Boolean compactionAsyncEnabled = true;
 
-  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10")
-  public Integer compactionTasks = 10;
+  @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1")
+  public Integer compactionTasks = -1;
 
   @Parameter(names = {"--compaction-trigger-strategy"},
       description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
@@ -250,8 +251,8 @@ public class FlinkStreamerConfig extends Configuration {
           + "Default is 'num_commits'")
   public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
 
-  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
-  public Integer compactionDeltaCommits = 5;
+  @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 1 commit")
+  public Integer compactionDeltaCommits = 1;
 
   @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour")
   public Integer compactionDeltaSeconds = 3600;
@@ -262,6 +263,39 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
   public Long compactionTargetIo = 512000L;
 
+  @Parameter(names = {"--clustering-async-enabled"}, description = "Async Clustering, disable by default")
+  public Boolean clusteringAsyncEnabled = false;
+
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1")
+  public Integer clusteringTasks = -1;
+
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 1 commit")
+  public Integer clusteringDeltaCommits = 1;
+
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan")
+  public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
+
+  @Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan")
+  public String planPartitionFilterMode = "NONE";
+
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB")
+  public Long targetFileMaxBytes = 1024 * 1024 * 1024L;
+
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB")
+  public Long smallFileLimit = 600L;
+
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0")
+  public Integer skipFromLatestPartitions = 0;
+
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.")
+  public String sortColumns = "";
+
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30")
+  public Integer maxNumGroups = 30;
+
+  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2")
+  public Integer targetPartitions = 2;
+
   @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
   public Boolean cleanAsyncEnabled = true;
 
@@ -406,6 +440,17 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
     conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
     conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, config.clusteringAsyncEnabled);
+    conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
+    conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
+    conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
+    conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+    conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
+    conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions);
     conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
     conf.setString(FlinkOptions.CLEAN_POLICY, config.cleanPolicy);
     conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b08eb570ce0..b1249334510 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -99,12 +99,23 @@ public class HoodieFlinkStreamer {
     }
 
     OptionsInference.setupSinkTasks(conf, env.getParallelism());
-    DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
-    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
-    if (OptionsResolver.needsAsyncCompaction(conf)) {
-      Pipelines.compact(conf, pipeline);
+    DataStream<Object> pipeline;
+    // Append mode
+    if (OptionsResolver.isAppendMode(conf)) {
+      pipeline = Pipelines.append(conf, rowType, dataStream, false);
+      if (OptionsResolver.needsAsyncClustering(conf)) {
+        Pipelines.cluster(conf, rowType, pipeline);
+      } else {
+        Pipelines.dummySink(pipeline);
+      }
     } else {
-      Pipelines.clean(conf, pipeline);
+      DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
+      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
+      if (OptionsResolver.needsAsyncCompaction(conf)) {
+        Pipelines.compact(conf, pipeline);
+      } else {
+        Pipelines.clean(conf, pipeline);
+      }
     }
 
     String jobName = cfg.targetDatabaseName.isEmpty() ? cfg.targetTableName :


[hudi] 13/20: [HUDI-5235] Clustering target size should larger than small file limit (#7232)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 700717c7344128f60a42795006b12daa063fc6d1
Author: zhuanshenbsj1 <34...@users.noreply.github.com>
AuthorDate: Tue Jan 24 13:14:45 2023 +0800

    [HUDI-5235] Clustering target size should larger than small file limit (#7232)
    
    * target size should larger than small file limit
    
    (cherry picked from commit 4f6b831ea11d8a99e828b4cb37dd770f8075dd43)
---
 .../java/org/apache/hudi/sink/clustering/ClusteringOperator.java    | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 308679e78a6..a414729724e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -132,6 +132,12 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     // override max parquet file size in conf
     this.conf.setLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
         this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES));
+
+    // target size should larger than small file limit
+    this.conf.setLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
+        this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES) > this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)
+          ? this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)
+            : this.conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES));
   }
 
   @Override


[hudi] 19/20: add DropPartitionsProcedure

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7588b918de23f12e098fdde6acac97dfd5c559c2
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Fri Feb 10 15:22:23 2023 +0800

    add DropPartitionsProcedure
---
 .../procedures/BackupInvalidParquetProcedure.scala |   2 +-
 .../procedures/DropPartitionsProcedure.scala       | 118 +++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedure/TestDropPartitionsProcedure.scala    |  67 ++++++++++++
 4 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index f7b50bdc3d4..e0963baa471 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
 
 import java.util.function.Supplier
 import scala.collection.JavaConversions.asScalaBuffer
-import scala.jdk.CollectionConverters.seqAsJavaListConverter
+import scala.collection.JavaConverters.seqAsJavaListConverter
 
 class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
new file mode 100644
index 00000000000..b194305e6c5
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DropPartitionsProcedure.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.types._
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
+class DropPartitionsProcedure extends BaseProcedure
+  with ProcedureBuilder
+  with PredicateHelper
+  with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
+    ProcedureParameter.optional(3, "selected_partitions", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty),
+    StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val predicate = getArgValueOrDefault(args, PARAMETERS(2))
+    val parts = getArgValueOrDefault(args, PARAMETERS(3))
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val selectedPartitions: String = (parts, predicate) match {
+      case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+      case (Some(o), _) => o.asInstanceOf[String]
+      case _ => ""
+    }
+
+    val rows: java.util.List[Row] = new java.util.ArrayList[Row]()
+    var partitionPaths: java.util.List[String] = new java.util.ArrayList[String]()
+    if (selectedPartitions.nonEmpty) {
+      partitionPaths = selectedPartitions.split(",").toList.asJava
+      logInfo(s"Drop partitions : $selectedPartitions")
+    } else {
+      logInfo("No partition to drop")
+    }
+
+    partitionPaths.asScala.foreach(part => {
+      val dropSql = s"ALTER TABLE ${metaClient.getTableConfig.getTableName} DROP PARTITION ($part)"
+      logInfo(s"dropSql: $dropSql")
+      spark.sql(dropSql)
+      rows.add(Row(true, part))
+    })
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new DropPartitionsProcedure()
+
+  def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = {
+    val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
+    val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
+      FileStatusCache.getOrCreate(sparkSession))
+
+    // Resolve partition predicates
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
+    val condition = resolveExpr(sparkSession, predicate, tableSchema)
+    val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
+    val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates(
+      sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
+    checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
+
+    // Get all partitions and prune partition by predicates
+    val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
+    prunedPartitions.map(path => path.getPath.replaceAll("/", ",")).toSet.mkString(",")
+  }
+}
+
+object DropPartitionsProcedure {
+  val NAME = "drop_partitions"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new DropPartitionsProcedure
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 5d945ecbfdb..f54db97b227 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -91,6 +91,7 @@ object HoodieProcedures {
       ,(HelpProcedure.NAME, HelpProcedure.builder)
       ,(DeleteRollbackInstantProcedure.NAME, DeleteRollbackInstantProcedure.builder)
       ,(DeleteFsFileProcedure.NAME, DeleteFsFileProcedure.builder)
+      ,(DropPartitionsProcedure.NAME, DropPartitionsProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
new file mode 100644
index 00000000000..a6fb2be45ee
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestDropPartitionsProcedure.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestDropPartitionsProcedure extends HoodieSparkProcedureTestBase {
+
+  test("Test Call drop_partitions Procedure With single-partition Pruning") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+
+        // Test partition pruning with single predicate
+        var resultA: Array[Seq[Any]] = Array.empty
+
+        {
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+          checkException(
+            s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L and id = 10')"
+          )("Only partition predicates are allowed")
+
+          // Do table drop partitions with partition predicate
+          resultA = spark.sql(s"call drop_partitions(table => '$tableName', predicate => 'ts <= 1001L')")
+            .collect()
+            .map(row => Seq(row.getBoolean(0), row.getString(1)))
+          assertResult(2)(resultA.length)
+        }
+      }
+    }
+  }
+}


[hudi] 09/20: [MINOR] improve RunClusteringProcedure with partition selected

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit cca17a599d6c571297c91c5a8d9a9c790d0ce258
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Tue Feb 7 14:01:21 2023 +0800

    [MINOR] improve RunClusteringProcedure with partition selected
---
 .../PartitionAwareClusteringPlanStrategy.java      |  2 ++
 .../procedures/RunClusteringProcedure.scala        | 31 +++++++++++++---------
 .../hudi/procedure/TestClusteringProcedure.scala   | 12 ++++-----
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 8aafa6d28c4..03d83dd12ea 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -73,6 +73,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
     HoodieWriteConfig config = getWriteConfig();
 
     String partitionSelected = config.getClusteringPartitionSelected();
+    LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
     List<String> partitionPaths;
 
     if (StringUtils.isNullOrEmpty(partitionSelected)) {
@@ -84,6 +85,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
     }
 
     partitionPaths = filterPartitionPaths(partitionPaths);
+    LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
 
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no clustering plan
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index d34c0b0d7b7..449e03c2e48 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption}
@@ -57,7 +57,8 @@ class RunClusteringProcedure extends BaseProcedure
     ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None),
     // params => key=value, key2=value2
     ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
-    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None)
+    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
+    ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -83,20 +84,26 @@ class RunClusteringProcedure extends BaseProcedure
     val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
     val options = getArgValueOrDefault(args, PARAMETERS(7))
     val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+    val parts = getArgValueOrDefault(args, PARAMETERS(9))
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
     var conf: Map[String, String] = Map.empty
-    predicate match {
-      case Some(p) =>
-        val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
-        conf = conf ++ Map(
-          HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
-          HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
-        )
-        logInfo(s"Partition predicates: $p, partition selected: $prunedPartitions")
-      case _ =>
-        logInfo("No partition predicates")
+
+    val selectedPartitions: String = (parts, predicate) match {
+      case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
+      case (Some(o), _) => o.asInstanceOf[String]
+      case _ => ""
+    }
+
+    if (selectedPartitions.nonEmpty) {
+      conf = conf ++ Map(
+        HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
+        HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
+      )
+      logInfo(s"Partition selected: $selectedPartitions")
+    } else {
+      logInfo("No partition selected")
     }
 
     // Construct sort column info
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index c9fbb0f5d9f..b21dbcad70d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -605,7 +605,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
     }
   }
 
-  test("Test Call run_clustering with partition selected config") {
+  test("Test Call run_clustering with partition selected") {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -631,9 +631,9 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
         spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
-        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
         // Do
-        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+          s"selected_partitions => 'ts=1010', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)
@@ -646,13 +646,13 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         )
       }
 
-      // Test clustering with PARTITION_SELECTED config set, choose all partitions to schedule
+      // Test clustering with PARTITION_SELECTED, choose all partitions to schedule
       {
         spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
         spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
         spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
-        spark.sql(s"set ${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
-        val result = spark.sql(s"call run_clustering(table => '$tableName', show_involved_partition => true)")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', " +
+          s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)


[hudi] 11/20: [HUDI-5515] Fix concurrency conflict in ClusteringOperator with latency marker (#7625)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 09df730555737dd6ce91151467d28d3f1c73d112
Author: Thinker313 <47...@users.noreply.github.com>
AuthorDate: Tue Jan 10 10:09:19 2023 +0800

    [HUDI-5515] Fix concurrency conflict in ClusteringOperator with latency marker (#7625)
    
    Co-authored-by: coder_wang <le...@gmail.com>
    
    (cherry picked from commit 2d1dd2a8ab11feb025d021d94d5fd6f2bfa9c66f)
---
 .../hudi/sink/clustering/ClusteringOperator.java   | 57 ++++++++++++----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index e30a3577f01..308679e78a6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -18,6 +18,32 @@
 
 package org.apache.hudi.sink.clustering;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -47,32 +73,6 @@ import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.StreamerUtil;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
-import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.runtime.util.StreamRecordCollector;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -157,6 +157,11 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     collector = new StreamRecordCollector<>(output);
   }
 
+  @Override
+  public void processLatencyMarker(LatencyMarker latencyMarker) {
+    // no need to propagate the latencyMarker
+  }
+
   @Override
   public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
     ClusteringPlanEvent event = element.getValue();


[hudi] 14/20: [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ee779fe86fef3ecf2144e72a7e90c75d15ad1ded
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jan 6 20:29:29 2023 +0800

    [HUDI-5341] CleanPlanner retains earliest commits must not be later than earliest pending commit (#7568)
    
    (cherry picked from commit f745e6457353804359b20575c597b38507237aba)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 16 ++++-
 .../hudi/table/action/clean/CleanPlanner.java      | 19 +++++-
 .../apache/hudi/common/util/ClusteringUtils.java   | 37 ++++++++++++
 .../hudi/common/util/TestClusteringUtils.java      | 69 ++++++++++++++++++++++
 4 files changed, 137 insertions(+), 4 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 2992f4abd4c..974440f165c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
@@ -397,7 +398,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
         }).flatMap(Collection::stream);
   }
 
-  private Stream<HoodieInstant> getCommitInstantsToArchive() {
+  private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
     // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
     // with logic above to avoid Stream.concat
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -430,6 +431,11 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
               : Option.empty();
 
+      // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
+      // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
+      Option<HoodieInstant> oldestInstantToRetainForClustering =
+          ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());
+
       // Actually do the commits
       Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
           .filter(s -> {
@@ -442,7 +448,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
             }
           }).filter(s -> {
-            // Ensure commits >= oldest pending compaction commit is retained
+            // Ensure commits >= the oldest pending compaction commit is retained
             return oldestPendingCompactionAndReplaceInstant
                 .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
                 .orElse(true);
@@ -459,6 +465,10 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
               oldestInstantToRetainForCompaction.map(instantToRetain ->
                       compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
+          ).filter(s ->
+              oldestInstantToRetainForClustering.map(instantToRetain ->
+                      HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
+                  .orElse(true)
           );
       return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
     } else {
@@ -466,7 +476,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
     }
   }
 
-  private Stream<HoodieInstant> getInstantsToArchive() {
+  private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
     Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
     if (config.isMetastoreEnabled()) {
       return Stream.empty();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 64e69b1d2a9..737388645b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -465,7 +465,24 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
     int hoursRetained = config.getCleanerHoursRetained();
     if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
         && commitTimeline.countInstants() > commitsRetained) {
-      earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+      Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
+          .getActiveTimeline()
+          .getCommitsTimeline()
+          .filter(s -> !s.isCompleted()).firstInstant();
+      if (earliestPendingCommits.isPresent()) {
+        // Earliest commit to retain must not be later than the earliest pending commit
+        earliestCommitToRetain =
+            commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
+              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+                return Option.of(nthInstant);
+              } else {
+                return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+              }
+            }).orElse(Option.empty());
+      } else {
+        earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
+            - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+      }
     } else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
       Instant instant = Instant.now();
       ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 9d741a03f82..17b8672094f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -223,4 +224,40 @@ public class ClusteringUtils {
   public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
     return getClusteringPlan(metaClient, instant).isPresent();
   }
+
+  /**
+   * Checks whether the latest clustering instant has a subsequent cleaning action. Returns
+   * the clustering instant if there is such cleaning action or empty.
+   *
+   * @param activeTimeline The active timeline
+   * @param metaClient     The meta client
+   * @return the oldest instant to retain for clustering
+   */
+  public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
+      HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException {
+    HoodieTimeline replaceTimeline = activeTimeline.getCompletedReplaceTimeline();
+    if (!replaceTimeline.empty()) {
+      Option<HoodieInstant> cleanInstantOpt =
+          activeTimeline.getCleanerTimeline().filter(instant -> !instant.isCompleted()).firstInstant();
+      if (cleanInstantOpt.isPresent()) {
+        // The first clustering instant of which timestamp is greater than or equal to the earliest commit to retain of
+        // the clean metadata.
+        HoodieInstant cleanInstant = cleanInstantOpt.get();
+        String earliestCommitToRetain =
+            CleanerUtils.getCleanerPlan(metaClient,
+                    cleanInstant.isRequested()
+                        ? cleanInstant
+                        : HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
+                .getEarliestInstantToRetain().getTimestamp();
+        return StringUtils.isNullOrEmpty(earliestCommitToRetain)
+            ? Option.empty()
+            : replaceTimeline.filter(instant ->
+                HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+                    HoodieTimeline.GREATER_THAN_OR_EQUALS,
+                    earliestCommitToRetain))
+            .firstInstant();
+      }
+    }
+    return Option.empty();
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index a5d45d1184f..edd877a28fd 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -18,17 +18,21 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,6 +47,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /**
  * Tests for {@link ClusteringUtils}.
@@ -114,6 +119,70 @@ public class TestClusteringUtils extends HoodieCommonTestHarness {
     assertEquals(requestedClusteringPlan, inflightClusteringPlan);
   }
 
+  @Test
+  public void testGetOldestInstantToRetainForClustering() throws IOException {
+    String partitionPath1 = "partition1";
+    List<String> fileIds1 = new ArrayList<>();
+    fileIds1.add(UUID.randomUUID().toString());
+    String clusterTime1 = "1";
+    HoodieInstant requestedInstant1 = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+    HoodieInstant inflightInstant1 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant1, Option.empty());
+    HoodieInstant completedInstant1 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant1, Option.empty());
+    List<String> fileIds2 = new ArrayList<>();
+    fileIds2.add(UUID.randomUUID().toString());
+    fileIds2.add(UUID.randomUUID().toString());
+    String clusterTime2 = "2";
+    HoodieInstant requestedInstant2 = createRequestedReplaceInstant(partitionPath1, clusterTime2, fileIds2);
+    HoodieInstant inflightInstant2 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant2, Option.empty());
+    metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant2, Option.empty());
+    List<String> fileIds3 = new ArrayList<>();
+    fileIds3.add(UUID.randomUUID().toString());
+    fileIds3.add(UUID.randomUUID().toString());
+    fileIds3.add(UUID.randomUUID().toString());
+    String clusterTime3 = "3";
+    HoodieInstant requestedInstant3 = createRequestedReplaceInstant(partitionPath1, clusterTime3, fileIds3);
+    HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty());
+    HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty());
+    metaClient.reloadActiveTimeline();
+    Option<HoodieInstant> actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertFalse(actual.isPresent());
+    // test first uncompleted clean instant is requested.
+    String cleanTime1 = "4";
+    HoodieInstant requestedInstant4 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime1);
+    HoodieCleanerPlan cleanerPlan1 = HoodieCleanerPlan.newBuilder()
+        .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+            .setAction(completedInstant1.getAction())
+            .setTimestamp(completedInstant1.getTimestamp())
+            .setState(completedInstant1.getState().name()))
+        .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+        .setFilesToBeDeletedPerPartition(new HashMap<>())
+        .setVersion(CleanPlanV2MigrationHandler.VERSION)
+        .build();
+    metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
+    metaClient.reloadActiveTimeline();
+    actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertEquals(clusterTime1, actual.get().getTimestamp());
+    HoodieInstant inflightInstant4 = metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4, Option.empty());
+    metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, Option.empty());
+    // test first uncompleted clean instant is inflight.
+    String cleanTime2 = "5";
+    HoodieInstant requestedInstant5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, cleanTime2);
+    HoodieCleanerPlan cleanerPlan2 = HoodieCleanerPlan.newBuilder()
+        .setEarliestInstantToRetainBuilder(HoodieActionInstant.newBuilder()
+            .setAction(completedInstant3.getAction())
+            .setTimestamp(completedInstant3.getTimestamp())
+            .setState(completedInstant3.getState().name()))
+        .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+        .setFilesToBeDeletedPerPartition(new HashMap<>())
+        .setVersion(CleanPlanV2MigrationHandler.VERSION)
+        .build();
+    metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant5, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan2));
+    metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant5, Option.empty());
+    metaClient.reloadActiveTimeline();
+    actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient);
+    assertEquals(clusterTime3, actual.get().getTimestamp());
+  }
+
   private void validateClusteringInstant(List<String> fileIds, String partitionPath,
                                          String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
     for (String fileId : fileIds) {


[hudi] 16/20: [HUDI-5506] StreamWriteOperatorCoordinator may not recommit with partial uncommitted write metadata event (#7611)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c265ddcc4b5bb307de319cb303ac249c85f787c2
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jan 6 20:32:42 2023 +0800

    [HUDI-5506] StreamWriteOperatorCoordinator may not recommit with partial uncommitted write metadata event (#7611)
    
    (cherry picked from commit 3e49e4c26dae1080e2b1a9389a75f56464c167a5)
---
 .../apache/hudi/sink/StreamWriteOperatorCoordinator.java |  8 ++++++--
 .../hudi/sink/TestStreamWriteOperatorCoordinator.java    | 16 ++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index f4acc2e83ad..8a913bf4298 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -397,7 +397,7 @@ public class StreamWriteOperatorCoordinator
     HoodieTimeline completedTimeline =
         StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
     executor.execute(() -> {
-      if (instant.equals("") || completedTimeline.containsInstant(instant)) {
+      if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) {
         // the last instant committed successfully
         reset();
       } else {
@@ -415,7 +415,11 @@ public class StreamWriteOperatorCoordinator
     this.eventBuffer[event.getTaskID()] = event;
     if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
       // start to initialize the instant.
-      initInstant(event.getInstantTime());
+      final String instant = Arrays.stream(eventBuffer)
+          .filter(evt -> evt.getWriteStatuses().size() > 0)
+          .findFirst().map(WriteMetadataEvent::getInstantTime)
+          .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+      initInstant(instant);
     }
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d5d35f7494f..5e712363ac9 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -164,6 +164,22 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
   }
 
+  @Test
+  public void testRecommitWithPartialUncommittedEvents() {
+    final CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    String instant = coordinator.getInstant();
+    String lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+    assertNull(lastCompleted, "Returns early for empty write results");
+    WriteMetadataEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2);
+    event1.setBootstrap(true);
+    WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap(1);
+    coordinator.handleEventFromOperator(0, event1);
+    coordinator.handleEventFromOperator(1, event2);
+    lastCompleted = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+    assertThat("Recommits the instant with partial uncommitted events", lastCompleted, is(instant));
+  }
+
   @Test
   public void testHiveSyncInvoked() throws Exception {
     // reset


[hudi] 07/20: fix enable metrics on

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2c8586f5a3b4f1a88340676f1e207bc85c5a740c
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sun Feb 5 10:38:35 2023 +0800

    fix enable metrics on
---
 .../main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 050b0519ee6..787819be120 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,7 +47,7 @@ public class HoodieMetricsConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
       .key(METRIC_PREFIX + ".on")
-      .defaultValue(false)
+      .defaultValue(true)
       .sinceVersion("0.5.0")
       .withDocumentation("Turn on/off metrics reporting. off by default.");
 


[hudi] 12/20: [HUDI-5543] Description of clustering.plan.partition.filter.mode supports DAY_ROLLING strategy (#7656)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 94ee9a133adf72c5c79b89ea2c9ce601d6b7f494
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jan 13 10:59:26 2023 +0800

    [HUDI-5543] Description of clustering.plan.partition.filter.mode supports DAY_ROLLING strategy (#7656)
    
    (cherry picked from commit 669e567672557ffa6d58fd37918f6b37eb842d39)
---
 .../src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java  | 4 +++-
 .../src/main/java/org/apache/hudi/configuration/FlinkOptions.java     | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 8db88178f8d..6e3224c3e50 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -153,7 +153,9 @@ public class HoodieClusteringConfig extends HoodieConfig {
           + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
           + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
           + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
-          + PARTITION_FILTER_END_PARTITION.key() + "'].");
+          + PARTITION_FILTER_END_PARTITION.key() + "']."
+          + "DAY_ROLLING: clustering partitions on a rolling basis by the hour to avoid clustering all partitions each time, "
+          + "which strategy sorts the partitions asc and chooses the partition of which index is divided by 24 and the remainder is equal to the current hour.");
 
   public static final ConfigProperty<String> PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 68f22c6f7e7..b885791daeb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -702,7 +702,9 @@ public class FlinkOptions extends HoodieConfig {
           + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
           + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
           + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
-          + PARTITION_FILTER_END_PARTITION.key() + "'].");
+          + PARTITION_FILTER_END_PARTITION.key() + "']."
+          + "DAY_ROLLING: clustering partitions on a rolling basis by the hour to avoid clustering all partitions each time, "
+          + "which strategy sorts the partitions asc and chooses the partition of which index is divided by 24 and the remainder is equal to the current hour.");
 
   public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
       .key("clustering.plan.strategy.target.file.max.bytes")


[hudi] 18/20: [HUDI-5286] UnsupportedOperationException throws when enabling filesystem retry (#7313)

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0f056e52e583828a7fbbb21454e60d1a9b74d48d
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Nov 29 10:20:41 2022 +0800

    [HUDI-5286] UnsupportedOperationException throws when enabling filesystem retry (#7313)
    
    (cherry picked from commit e88b4748127c45f287e45a11dda9183e29ad87d5)
---
 .../hudi/common/fs/HoodieRetryWrapperFileSystem.java       |  5 +++++
 .../hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java  | 14 ++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
index 075f811a42e..051cece84fe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java
@@ -254,4 +254,9 @@ public class HoodieRetryWrapperFileSystem extends FileSystem {
   public Configuration getConf() {
     return fileSystem.getConf();
   }
+
+  @Override
+  public String getScheme() {
+    return fileSystem.getScheme();
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
index 0b849ebec81..1c5447751f7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java
@@ -38,6 +38,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
@@ -80,6 +81,14 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils {
     folders.forEach(f -> assertThrows(RuntimeException.class, () -> metaClient.getFs().mkdirs(new Path(new Path(basePath), f))));
   }
 
+  @Test
+  public void testGetSchema() {
+    FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 100);
+    FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
+    HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
+    assertDoesNotThrow(fs::getScheme, "Method #getSchema does not implement correctly");
+  }
+
   /**
    * Fake remote FileSystem which will throw RuntimeException something like AmazonS3Exception 503.
    */
@@ -206,5 +215,10 @@ public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils {
     public Configuration getConf() {
       return fs.getConf();
     }
+
+    @Override
+    public String getScheme() {
+      return fs.getScheme();
+    }
   }
 }


[hudi] 20/20: [HUDI-5495] add some property to table config

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 45bef56db5509cacac927c28eb8d86b6b6bf652d
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Sat Feb 11 10:21:13 2023 +0800

    [HUDI-5495] add some property to table config
---
 .../hudi/common/table/HoodieTableMetaClient.java   | 19 ++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    | 57 +++++++++++-----------
 2 files changed, 48 insertions(+), 28 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e0d6b346bd1..4329eba86bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -497,6 +497,22 @@ public class HoodieTableMetaClient implements Serializable {
     return metaClient;
   }
 
+  /**
+   * Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
+   *
+   * @return Instance of HoodieTableMetaClient
+   */
+  public static HoodieTableMetaClient updateTableAndGetMetaClient(Configuration hadoopConf, String basePath,
+                                                                Properties props) {
+    LOG.info("Update hoodie table with basePath " + basePath);
+
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
+        .setConf(hadoopConf).build();
+    HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), props);
+    LOG.info("Finished update Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
+    return metaClient;
+  }
+
   public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, String basePath, FileSystem fs) throws IOException {
 
     // Create bootstrap index by partition folder if it does not exist
@@ -1183,5 +1199,8 @@ public class HoodieTableMetaClient implements Serializable {
       return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
     }
 
+    public HoodieTableMetaClient updateTable(Configuration configuration, String basePath) {
+      return HoodieTableMetaClient.updateTableAndGetMetaClient(configuration, basePath, build());
+    }
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 02cb7d02d2d..fbc20799ff8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -301,40 +301,41 @@ public class StreamerUtil {
   public static HoodieTableMetaClient initTableIfNotExists(
       Configuration conf,
       org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
-    final String basePath = conf.getString(FlinkOptions.PATH);
-    if (!tableExists(basePath, hadoopConf)) {
-      if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) {
-        if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
-          conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
-        }
+    if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) {
+      if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
+        conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
       }
+    }
 
-      HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
-          .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
-          .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
-          .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
-          .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
-          .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
-          .setPreCombineField(OptionsResolver.getPreCombineField(conf))
-          .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
-          .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
-          .setKeyGeneratorClassProp(
-              conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
-          .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
-          .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
-          .setTimelineLayoutVersion(1)
-          .setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
-          .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE))
-          .setIndexBucketEngine(conf.getString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE))
-          .setIndexHashField(conf.getString(FlinkOptions.INDEX_KEY_FIELD))
-          .setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))
-          .initTable(hadoopConf, basePath);
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final HoodieTableMetaClient.PropertyBuilder tablePropertyBuilder = HoodieTableMetaClient.withPropertyBuilder()
+        .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
+        .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
+        .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
+        .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
+        .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+        .setPreCombineField(OptionsResolver.getPreCombineField(conf))
+        .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+        .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
+        .setKeyGeneratorClassProp(
+            conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
+        .setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
+        .setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
+        .setTimelineLayoutVersion(1)
+        .setAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
+        .setIndexType(conf.getString(FlinkOptions.INDEX_TYPE))
+        .setIndexBucketEngine(conf.getString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE))
+        .setIndexHashField(conf.getString(FlinkOptions.INDEX_KEY_FIELD))
+        .setIndexNumBuckets(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+
+    if (!tableExists(basePath, hadoopConf)) {
       LOG.info("Table initialized under base path {}", basePath);
-      return metaClient;
+      return tablePropertyBuilder.initTable(hadoopConf, basePath);
     } else {
       LOG.info("Table [{}/{}] already exists, no need to initialize the table",
           basePath, conf.getString(FlinkOptions.TABLE_NAME));
-      return StreamerUtil.createMetaClient(basePath, hadoopConf);
+      LOG.info("Table update under base path {}", basePath);
+      return tablePropertyBuilder.updateTable(hadoopConf, basePath);
     }
     // Do not close the filesystem in order to use the CACHE,
     // some filesystems release the handles in #close method.


[hudi] 01/20: fix metatable hbase-site

Posted by fo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d8043d48ea0d5914b032779f23ee06fd6b23f2cc
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Wed Feb 1 16:21:53 2023 +0800

    fix metatable hbase-site
---
 .idea/vcs.xml                                 | 1 -
 hudi-common/src/main/resources/hbase-site.xml | 2 +-
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 6310577ae1b..64c37f96056 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -16,6 +16,5 @@
   </component>
   <component name="VcsDirectoryMappings">
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
-    <mapping directory="$PROJECT_DIR$/src_release/hudi-tmp-clone" vcs="Git" />
   </component>
 </project>
\ No newline at end of file
diff --git a/hudi-common/src/main/resources/hbase-site.xml b/hudi-common/src/main/resources/hbase-site.xml
index 8397e6324af..f33fe1a7f38 100644
--- a/hudi-common/src/main/resources/hbase-site.xml
+++ b/hudi-common/src/main/resources/hbase-site.xml
@@ -1434,7 +1434,7 @@ possible configurations would overwhelm and obscure the important.
   </property>
   <property>
     <name>hbase.defaults.for.version.skip</name>
-    <value>false</value>
+    <value>true</value>
     <description>Set to true to skip the 'hbase.defaults.for.version' check.
       Setting this to true can be useful in contexts other than
       the other side of a maven generation; i.e. running in an