You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by zh...@apache.org on 2023/03/08 01:48:47 UTC
[hudi] branch master updated: [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c6441134ba6 [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
c6441134ba6 is described below
commit c6441134ba679177466eb945319d4245a08e14b0
Author: voonhous <vo...@gmail.com>
AuthorDate: Wed Mar 8 09:48:41 2023 +0800
[MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
* Fix testHoodieFlinkClusteringScheduleAfterArchive
* Fix checkstyle
---
.../sink/cluster/ITTestHoodieFlinkClustering.java | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 7dcd0cec1c3..1ffde729001 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -18,9 +18,9 @@
package org.apache.hudi.sink.cluster;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -65,6 +65,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -348,12 +349,12 @@ public class ITTestHoodieFlinkClustering {
// judge whether have operation
// To compute the clustering instant time and do clustering.
- String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ String firstClusteringInstant = HoodieActiveTimeline.createNewInstantTime();
HoodieFlinkWriteClient<?> writeClient = FlinkWriteClients.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
- boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+ boolean scheduled = writeClient.scheduleClusteringAtInstant(firstClusteringInstant, Option.empty());
assertTrue(scheduled, "The clustering plan should be scheduled");
@@ -370,7 +371,7 @@ public class ITTestHoodieFlinkClustering {
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
// Mark instant as clustering inflight
- HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+ HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(firstClusteringInstant);
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
@@ -378,7 +379,7 @@ public class ITTestHoodieFlinkClustering {
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream<ClusteringCommitEvent> dataStream =
- env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+ env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant, clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
@@ -415,10 +416,11 @@ public class ITTestHoodieFlinkClustering {
timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
.filter(i -> i.getState() == HoodieInstant.State.REQUESTED);
+ HoodieInstant secondClusteringInstant = timeline.lastInstant().get();
+ List<HoodieClusteringGroup> inputFileGroups = ClusteringUtils.getClusteringPlan(table.getMetaClient(), secondClusteringInstant).get().getRight().getInputGroups();
// clustering plan has no previous file slice generated by previous pending clustering
- assertFalse(ClusteringUtils.getClusteringPlan(table.getMetaClient(), timeline.lastInstant().get()).get()
- .getRight().getInputGroups()
- .stream().anyMatch(g -> g.getSlices()
- .stream().anyMatch(f -> clusteringInstantTime.equals(FSUtils.getCommitTime(f.getDataFilePath())))));
+ assertFalse(inputFileGroups
+ .stream().anyMatch(fg -> fg.getSlices()
+ .stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
}
}