You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2022/07/07 06:11:31 UTC
[hudi] branch master updated: [HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)
This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e74ad324c3 [HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)
e74ad324c3 is described below
commit e74ad324c33c3c51d762c4b3349b92a25888ed10
Author: Lanyuanxiaoyao <la...@gmail.com>
AuthorDate: Thu Jul 7 14:11:26 2022 +0800
[HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)
* [HUDI-4152] Flink offline compaction allow compact multi compaction plan at once
* [HUDI-4152] Fix exception for duplicated uid when multi compaction plan are compacted
* [HUDI-4152] Provider UT & IT for compact multi compaction plan
* [HUDI-4152] Put multi compaction plans into one compaction plan source
* [HUDI-4152] InstantCompactionPlanSelectStrategy allow multi instant by using comma
* [HUDI-4152] Add IT for InstantCompactionPlanSelectStrategy
---
.../hudi/sink/compact/FlinkCompactionConfig.java | 16 +++
.../hudi/sink/compact/HoodieFlinkCompactor.java | 120 +++++++++++------
.../compact/MultiCompactionPlanSourceFunction.java | 90 +++++++++++++
.../AllPendingCompactionPlanSelectStrategy.java | 35 +++++
.../strategy/CompactionPlanSelectStrategy.java | 34 +++++
.../InstantCompactionPlanSelectStrategy.java | 50 +++++++
.../MultiCompactionPlanSelectStrategy.java | 42 ++++++
.../SingleCompactionPlanSelectStrategy.java | 43 ++++++
.../sink/compact/ITTestHoodieFlinkCompactor.java | 117 ++++++++++++++++
.../compact/TestCompactionPlanSelectStrategy.java | 149 +++++++++++++++++++++
10 files changed, 656 insertions(+), 40 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 6513859556..02041690f1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -23,6 +23,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
/**
* Configurations for Hoodie Flink compaction.
@@ -110,6 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;
+ @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+ + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+ + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+ + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
+ public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
+
+ @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
+ + "It's only effective for MultiCompactionPlanSelectStrategy.")
+ public Integer compactionPlanMaxSelect = 10;
+
+ @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+ + "one instant in a time by using comma."
+ + "It's only effective for InstantCompactionPlanSelectStrategy.")
+ public String compactionPlanInstant;
@Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 546136e416..f56b5a2f0f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -18,6 +18,11 @@
package org.apache.hudi.sink.compact;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -26,9 +31,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
@@ -42,9 +49,12 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
/**
* Flink hudi compaction program that can be executed manually.
@@ -218,74 +228,104 @@ public class HoodieFlinkCompactor {
}
// fetch the instant based on the configured execution sequence
- HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
- Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
- if (!requested.isPresent()) {
+ HoodieTimeline timeline = table.getActiveTimeline();
+ List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
+ .select(timeline.filterPendingCompactionTimeline(), cfg);
+ if (requested.isEmpty()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return;
}
- String compactionInstantTime = requested.get().getTimestamp();
-
- HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
- if (timeline.containsInstant(inflightInstant)) {
- LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
- table.rollbackInflightCompaction(inflightInstant);
- table.getMetaClient().reloadActiveTimeline();
- }
+ List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ compactionInstantTimes.forEach(timestamp -> {
+ HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
+ if (timeline.containsInstant(inflightInstant)) {
+ LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
+ table.rollbackInflightCompaction(inflightInstant);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ });
- // generate compaction plan
+ // generate timestamp and compaction plan pair
// should support configurable commit metadata
- HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
- table.getMetaClient(), compactionInstantTime);
-
- if (compactionPlan == null || (compactionPlan.getOperations() == null)
- || (compactionPlan.getOperations().isEmpty())) {
+ List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream()
+ .map(timestamp -> {
+ try {
+ return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ })
+ // reject empty compaction plan
+ .filter(pair -> !(pair.getRight() == null
+ || pair.getRight().getOperations() == null
+ || pair.getRight().getOperations().isEmpty()))
+ .collect(Collectors.toList());
+
+ if (compactionPlans.isEmpty()) {
// No compaction plan, do nothing and return.
- LOG.info("No compaction plan for instant " + compactionInstantTime);
+ LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
return;
}
- HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
- if (!pendingCompactionTimeline.containsInstant(instant)) {
- // this means that the compaction plan was written to auxiliary path(.tmp)
- // but not the meta path(.hoodie), this usually happens when the job crush
- // exceptionally.
-
- // clean the compaction plan in auxiliary path and cancels the compaction.
-
- LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
- + "Clean the compaction plan in auxiliary path and cancels the compaction");
- CompactionUtil.cleanInstant(table.getMetaClient(), instant);
- return;
+ for (HoodieInstant instant : instants) {
+ if (!pendingCompactionTimeline.containsInstant(instant)) {
+ // this means that the compaction plan was written to auxiliary path(.tmp)
+ // but not the meta path(.hoodie), this usually happens when the job crush
+ // exceptionally.
+ // clean the compaction plan in auxiliary path and cancels the compaction.
+ LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ + "Clean the compaction plan in auxiliary path and cancels the compaction");
+ CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+ return;
+ }
}
// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
- ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
+ ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum())
+ : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
- LOG.info("Start to compaction for instant " + compactionInstantTime);
+ LOG.info("Start to compaction for instant " + compactionInstantTimes);
// Mark instant as compaction inflight
- table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+ for (HoodieInstant instant : instants) {
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+ }
table.getMetaClient().reloadActiveTimeline();
- env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
+ // use side-output to make operations that is in the same plan to be placed in the same stream
+ // keyby() cannot sure that different operations are in the different stream
+ DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
- .uid("uid_compaction_source")
- .rebalance()
+ .uid("uid_compaction_source");
+
+ SingleOutputStreamOperator<Void> operator = source.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
- .addSink(new CompactionCommitSink(conf))
- .name("clean_commits")
- .uid("uid_clean_commits")
+ .process(new ProcessFunction<CompactionCommitEvent, Void>() {
+ @Override
+ public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
+ context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
+ }
+ })
+ .name("group_by_compaction_plan")
+ .uid("uid_group_by_compaction_plan")
.setParallelism(1);
- env.execute("flink_hudi_compaction_" + compactionInstantTime);
+ compactionPlans.forEach(pair ->
+ operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
+ .addSink(new CompactionCommitSink(conf))
+ .name("clean_commits " + pair.getLeft())
+ .uid("uid_clean_commits_" + pair.getLeft())
+ .setParallelism(1));
+
+ env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
new file mode 100644
index 0000000000..8a8c3f6b4e
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi compaction source function.
+ *
+ * <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
+ * event {@link CompactionPlanEvent} to downstream operators.
+ *
+ * <p>The compaction instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ * <li>If the timeline has no inflight instants,
+ * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ * as the instant time;</li>
+ * <li>If the timeline has inflight instants,
+ * use the median instant time between [last complete instant time, earliest inflight instant time]
+ * as the instant time.</li>
+ * </ul>
+ */
+public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class);
+
+ /**
+ * compaction plan instant -> compaction plan
+ */
+ private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
+
+ public MultiCompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
+ this.compactionPlans = compactionPlans;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
+ HoodieCompactionPlan compactionPlan = pair.getRight();
+ List<CompactionOperation> operations = compactionPlan.getOperations().stream()
+ .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+ LOG.info("CompactionPlanFunction compacting " + operations + " files");
+ for (CompactionOperation operation : operations) {
+ sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void cancel() {
+ // no operation
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..23b6708ff3
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+
+/**
+ * Select all pending compaction plan to compact
+ */
+public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+ @Override
+ public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+ return pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..a41fcef198
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+
+/**
+ * CompactionRangeStrategy
+ */
+public interface CompactionPlanSelectStrategy {
+ /**
+ * Define how to select compaction plan to compact
+ */
+ List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config);
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..45382b70c4
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Specify the compaction plan instant to compact
+ */
+public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+ protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
+
+ @Override
+ public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+ if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
+ LOG.warn("None instant is selected");
+ return Collections.emptyList();
+ }
+ List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
+ return pendingCompactionTimeline.getInstants()
+ .filter(instant -> instants.contains(instant.getTimestamp()))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..ee0e93653f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.util.CompactionUtil;
+
+/**
+ * Select multi compaction plan to compact
+ */
+public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+ @Override
+ public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+ List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+ if (CompactionUtil.isLIFO(config.compactionSeq)) {
+ Collections.reverse(pendingCompactionPlanInstants);
+ }
+ int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size());
+ return pendingCompactionPlanInstants.subList(0, range);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..7ca939866c
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.util.CompactionUtil;
+
+/**
+ * Select one compaction plan to compact
+ */
+public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+ @Override
+ public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+ Option<HoodieInstant> compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq)
+ ? pendingCompactionTimeline.lastInstant()
+ : pendingCompactionTimeline.firstInstant();
+ if (compactionPlanInstant.isPresent()) {
+ return Collections.singletonList(compactionPlanInstant.get());
+ }
+ return Collections.emptyList();
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index e1e86ce32b..43e4ed5114 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -18,6 +18,11 @@
package org.apache.hudi.sink.compact;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -25,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
@@ -49,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor {
private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
+ private static final Map<String, List<String>> EXPECTED3 = new HashMap<>();
+
static {
EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
@@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor {
EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
+
+ EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
+ EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
+ EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
+ EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
+ EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5"));
}
@TempDir
@@ -203,4 +218,106 @@ public class ITTestHoodieFlinkCompactor {
TestData.checkWrittenFullData(tempFile, EXPECTED2);
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ tableEnv.executeSql(hoodieTableDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ TimeUnit.SECONDS.sleep(3);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+ CompactionUtil.setAvroSchema(conf, metaClient);
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
+ List<String> compactionInstantTimeList = new ArrayList<>(2);
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+
+ compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
+
+ // insert a new record to new partition, so that we can generate a new compaction plan
+ String insertT1ForNewPartition = "insert into t1 values\n"
+ + "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n"
+ + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
+ tableEnv.executeSql(insertT1ForNewPartition).await();
+
+ TimeUnit.SECONDS.sleep(3);
+
+ compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
+
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ List<Pair<String, HoodieCompactionPlan>> compactionPlans = new ArrayList<>(2);
+ for (String compactionInstantTime : compactionInstantTimeList) {
+ HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime);
+ compactionPlans.add(Pair.of(compactionInstantTime, plan));
+ }
+
+ // Mark instant as compaction inflight
+ for (String compactionInstantTime : compactionInstantTimeList) {
+ HoodieInstant hoodieInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+
+ DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
+ .name("compaction_source")
+ .uid("uid_compaction_source");
+ SingleOutputStreamOperator<Void> operator = source.rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new ProcessOperator<>(new CompactFunction(conf)))
+ .setParallelism(1)
+ .process(new ProcessFunction<CompactionCommitEvent, Void>() {
+ @Override
+ public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
+ context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
+ }
+ })
+ .name("group_by_compaction_plan")
+ .uid("uid_group_by_compaction_plan")
+ .setParallelism(1);
+ compactionPlans.forEach(pair ->
+ operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
+ .addSink(new CompactionCommitSink(conf))
+ .name("clean_commits " + pair.getLeft())
+ .uid("uid_clean_commits_" + pair.getLeft())
+ .setParallelism(1));
+
+ env.execute("flink_hudi_compaction");
+ writeClient.close();
+ TestData.checkWrittenFullData(tempFile, EXPECTED3);
+ }
+
+ private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
+ boolean scheduled = false;
+ // judge whether have operation
+ // To compute the compaction instant time and do compaction.
+ Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
+ if (compactionInstantTimeOption.isPresent()) {
+ scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
+ }
+ assertTrue(scheduled, "The compaction plan should be scheduled");
+ return compactionInstantTimeOption.get();
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..3ac9f6c666
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test case for every {@link CompactionPlanSelectStrategy} implements
+ */
+public class TestCompactionPlanSelectStrategy {
+ private HoodieTimeline timeline;
+ private HoodieTimeline emptyTimeline;
+ private HoodieTimeline allCompleteTimeline;
+
+ private static final HoodieInstant INSTANT_001 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+ private static final HoodieInstant INSTANT_002 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "002");
+ private static final HoodieInstant INSTANT_003 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "003");
+ private static final HoodieInstant INSTANT_004 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "004");
+ private static final HoodieInstant INSTANT_005 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "005");
+ private static final HoodieInstant INSTANT_006 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "006");
+
+ @BeforeEach
+ public void beforeEach() {
+ timeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_005, INSTANT_006);
+ emptyTimeline = new MockHoodieActiveTimeline();
+ allCompleteTimeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_005);
+ }
+
+ @Test
+ void testSingleCompactionPlanSelectStrategy() {
+ HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+ FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+
+ SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy();
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+ }
+
+ @Test
+ void testMultiCompactionPlanSelectStrategy() {
+ HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+ FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+ compactionConfig.compactionPlanMaxSelect = 2;
+
+ MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy();
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+ }
+
+ @Test
+ void testAllPendingCompactionPlanSelectStrategy() {
+ HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+ FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+
+ AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy();
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006},
+ strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+ HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+ }
+
+ @Test
+ void testInstantCompactionPlanSelectStrategy() {
+ HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+ FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+ compactionConfig.compactionPlanInstant = "004";
+
+ InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy();
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ compactionConfig.compactionPlanInstant = "002,003";
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ compactionConfig.compactionPlanInstant = "002,005";
+ assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+ compactionConfig.compactionPlanInstant = "005";
+ assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig));
+ }
+
+ private void assertHoodieInstantsEquals(HoodieInstant[] expected, List<HoodieInstant> actual) {
+ assertEquals(expected.length, actual.size());
+ for (int index = 0; index < expected.length; index++) {
+ assertHoodieInstantEquals(expected[index], actual.get(index));
+ }
+ }
+
+ private void assertHoodieInstantEquals(HoodieInstant expected, HoodieInstant actual) {
+ assertEquals(expected.getState(), actual.getState());
+ assertEquals(expected.getAction(), actual.getAction());
+ assertEquals(expected.getTimestamp(), actual.getTimestamp());
+ }
+
+ private static final class MockHoodieActiveTimeline extends HoodieActiveTimeline {
+ public MockHoodieActiveTimeline(HoodieInstant... instants) {
+ super();
+ setInstants(Arrays.asList(instants));
+ }
+ }
+}