You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by zh...@apache.org on 2023/03/08 01:48:47 UTC

[hudi] branch master updated: [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)

This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c6441134ba6 [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
c6441134ba6 is described below

commit c6441134ba679177466eb945319d4245a08e14b0
Author: voonhous <vo...@gmail.com>
AuthorDate: Wed Mar 8 09:48:41 2023 +0800

    [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
    
    * Fix testHoodieFlinkClusteringScheduleAfterArchive
    
    * Fix checkstyle
---
 .../sink/cluster/ITTestHoodieFlinkClustering.java    | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 7dcd0cec1c3..1ffde729001 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.sink.cluster;
 
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -65,6 +65,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -348,12 +349,12 @@ public class ITTestHoodieFlinkClustering {
 
     // judge whether have operation
     // To compute the clustering instant time and do clustering.
-    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    String firstClusteringInstant = HoodieActiveTimeline.createNewInstantTime();
 
     HoodieFlinkWriteClient<?> writeClient = FlinkWriteClients.createWriteClient(conf);
     HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
-    boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+    boolean scheduled = writeClient.scheduleClusteringAtInstant(firstClusteringInstant, Option.empty());
 
     assertTrue(scheduled, "The clustering plan should be scheduled");
 
@@ -370,7 +371,7 @@ public class ITTestHoodieFlinkClustering {
     HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
 
     // Mark instant as clustering inflight
-    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(firstClusteringInstant);
     table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
 
     final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
@@ -378,7 +379,7 @@ public class ITTestHoodieFlinkClustering {
     final RowType rowType = (RowType) rowDataType.getLogicalType();
 
     DataStream<ClusteringCommitEvent> dataStream =
-        env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+        env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant, clusteringPlan))
             .name("clustering_source")
             .uid("uid_clustering_source")
             .rebalance()
@@ -415,10 +416,11 @@ public class ITTestHoodieFlinkClustering {
     timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
         .filter(i -> i.getState() == HoodieInstant.State.REQUESTED);
 
+    HoodieInstant secondClusteringInstant = timeline.lastInstant().get();
+    List<HoodieClusteringGroup> inputFileGroups = ClusteringUtils.getClusteringPlan(table.getMetaClient(), secondClusteringInstant).get().getRight().getInputGroups();
     // clustering plan has no previous file slice generated by previous pending clustering
-    assertFalse(ClusteringUtils.getClusteringPlan(table.getMetaClient(), timeline.lastInstant().get()).get()
-        .getRight().getInputGroups()
-        .stream().anyMatch(g -> g.getSlices()
-            .stream().anyMatch(f -> clusteringInstantTime.equals(FSUtils.getCommitTime(f.getDataFilePath())))));
+    assertFalse(inputFileGroups
+        .stream().anyMatch(fg -> fg.getSlices()
+            .stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
   }
 }