You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/25 02:41:52 UTC

[hudi] branch master updated: [HUDI-5042] Fix clustering schedule problem in flink when enable schedule clustering and disable async clustering (#6976)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce3bac45a2 [HUDI-5042] Fix clustering schedule problem in flink when enable schedule clustering and disable async clustering (#6976)
ce3bac45a2 is described below

commit ce3bac45a2886b0fa6136888a34fbbe4a27012b7
Author: Bingeng Huang <30...@qq.com>
AuthorDate: Tue Oct 25 10:41:45 2022 +0800

    [HUDI-5042] Fix clustering schedule problem in flink when enable schedule clustering and disable async clustering (#6976)
    
    Co-authored-by: hbg <bi...@shopee.com>
---
 .../java/org/apache/hudi/util/StreamerUtil.java    |  2 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 67 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index cdacefbf17..86be092ce8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -171,7 +171,7 @@ public class StreamerUtil {
             .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
             .withClusteringConfig(
                 HoodieClusteringConfig.newBuilder()
-                    .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
+                    .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
                     .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
                     .withClusteringPlanPartitionFilterMode(
                         ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index a0073d8a37..f50f5748be 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -220,4 +221,70 @@ public class ITTestHoodieFlinkClustering {
 
     TestData.checkWrittenData(tempFile, EXPECTED, 4);
   }
+
+  @Test
+  public void testHoodieFlinkClusteringSchedule() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2);
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // To compute the clustering instant time.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+
+    boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertFalse(scheduled, "1 delta commit, the clustering plan should not be scheduled");
+
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "2 delta commits, the clustering plan should be scheduled");
+  }
 }