You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2022/07/07 06:11:31 UTC

[hudi] branch master updated: [HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)

This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e74ad324c3 [HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)
e74ad324c3 is described below

commit e74ad324c33c3c51d762c4b3349b92a25888ed10
Author: Lanyuanxiaoyao <la...@gmail.com>
AuthorDate: Thu Jul 7 14:11:26 2022 +0800

    [HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)
    
    * [HUDI-4152] Flink offline compaction allow compact multi compaction plan at once
    
    * [HUDI-4152] Fix exception for duplicated uid when multi compaction plan are compacted
    
    * [HUDI-4152] Provider UT & IT for compact multi compaction plan
    
    * [HUDI-4152] Put multi compaction plans into one compaction plan source
    
    * [HUDI-4152] InstantCompactionPlanSelectStrategy allow multi instant by using comma
    
    * [HUDI-4152] Add IT for InstantCompactionPlanSelectStrategy
---
 .../hudi/sink/compact/FlinkCompactionConfig.java   |  16 +++
 .../hudi/sink/compact/HoodieFlinkCompactor.java    | 120 +++++++++++------
 .../compact/MultiCompactionPlanSourceFunction.java |  90 +++++++++++++
 .../AllPendingCompactionPlanSelectStrategy.java    |  35 +++++
 .../strategy/CompactionPlanSelectStrategy.java     |  34 +++++
 .../InstantCompactionPlanSelectStrategy.java       |  50 +++++++
 .../MultiCompactionPlanSelectStrategy.java         |  42 ++++++
 .../SingleCompactionPlanSelectStrategy.java        |  43 ++++++
 .../sink/compact/ITTestHoodieFlinkCompactor.java   | 117 ++++++++++++++++
 .../compact/TestCompactionPlanSelectStrategy.java  | 149 +++++++++++++++++++++
 10 files changed, 656 insertions(+), 40 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 6513859556..02041690f1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -23,6 +23,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 
 import com.beust.jcommander.Parameter;
 import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
 
 /**
  * Configurations for Hoodie Flink compaction.
@@ -110,6 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
       description = "Min compaction interval of async compaction service, default 10 minutes")
   public Integer minCompactionIntervalSeconds = 600;
 
+  @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+      + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+      + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+      + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+      + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
+  public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
+
+  @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
+      + "It's only effective for MultiCompactionPlanSelectStrategy.")
+  public Integer compactionPlanMaxSelect = 10;
+
+  @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+      + "one instant in a time by using comma."
+      + "It's only effective for InstantCompactionPlanSelectStrategy.")
+  public String compactionPlanInstant;
   @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
   public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 546136e416..f56b5a2f0f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -18,6 +18,11 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.hudi.async.HoodieAsyncTableService;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -26,9 +31,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
@@ -42,9 +49,12 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 /**
  * Flink hudi compaction program that can be executed manually.
@@ -218,74 +228,104 @@ public class HoodieFlinkCompactor {
       }
 
       // fetch the instant based on the configured execution sequence
-      HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
-      Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
-      if (!requested.isPresent()) {
+      HoodieTimeline timeline = table.getActiveTimeline();
+      List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
+          .select(timeline.filterPendingCompactionTimeline(), cfg);
+      if (requested.isEmpty()) {
         // do nothing.
         LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
         return;
       }
 
-      String compactionInstantTime = requested.get().getTimestamp();
-
-      HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-      if (timeline.containsInstant(inflightInstant)) {
-        LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
-        table.rollbackInflightCompaction(inflightInstant);
-        table.getMetaClient().reloadActiveTimeline();
-      }
+      List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      compactionInstantTimes.forEach(timestamp -> {
+        HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
+        if (timeline.containsInstant(inflightInstant)) {
+          LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
+          table.rollbackInflightCompaction(inflightInstant);
+          table.getMetaClient().reloadActiveTimeline();
+        }
+      });
 
-      // generate compaction plan
+      // generate timestamp and compaction plan pair
       // should support configurable commit metadata
-      HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-          table.getMetaClient(), compactionInstantTime);
-
-      if (compactionPlan == null || (compactionPlan.getOperations() == null)
-          || (compactionPlan.getOperations().isEmpty())) {
+      List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream()
+          .map(timestamp -> {
+            try {
+              return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
+            } catch (IOException e) {
+              throw new HoodieException(e);
+            }
+          })
+          // reject empty compaction plan
+          .filter(pair -> !(pair.getRight() == null
+              || pair.getRight().getOperations() == null
+              || pair.getRight().getOperations().isEmpty()))
+          .collect(Collectors.toList());
+
+      if (compactionPlans.isEmpty()) {
         // No compaction plan, do nothing and return.
-        LOG.info("No compaction plan for instant " + compactionInstantTime);
+        LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
         return;
       }
 
-      HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+      List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
       HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
-      if (!pendingCompactionTimeline.containsInstant(instant)) {
-        // this means that the compaction plan was written to auxiliary path(.tmp)
-        // but not the meta path(.hoodie), this usually happens when the job crush
-        // exceptionally.
-
-        // clean the compaction plan in auxiliary path and cancels the compaction.
-
-        LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
-            + "Clean the compaction plan in auxiliary path and cancels the compaction");
-        CompactionUtil.cleanInstant(table.getMetaClient(), instant);
-        return;
+      for (HoodieInstant instant : instants) {
+        if (!pendingCompactionTimeline.containsInstant(instant)) {
+          // this means that the compaction plan was written to auxiliary path(.tmp)
+          // but not the meta path(.hoodie), this usually happens when the job crush
+          // exceptionally.
+          // clean the compaction plan in auxiliary path and cancels the compaction.
+          LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+              + "Clean the compaction plan in auxiliary path and cancels the compaction");
+          CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+          return;
+        }
       }
 
       // get compactionParallelism.
       int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
-          ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
+          ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum())
+          : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
 
-      LOG.info("Start to compaction for instant " + compactionInstantTime);
+      LOG.info("Start to compaction for instant " + compactionInstantTimes);
 
       // Mark instant as compaction inflight
-      table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+      for (HoodieInstant instant : instants) {
+        table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+      }
       table.getMetaClient().reloadActiveTimeline();
 
-      env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
+      // use side-output to make operations that is in the same plan to be placed in the same stream
+      // keyby() cannot sure that different operations are in the different stream
+      DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
           .name("compaction_source")
-          .uid("uid_compaction_source")
-          .rebalance()
+          .uid("uid_compaction_source");
+
+      SingleOutputStreamOperator<Void> operator = source.rebalance()
           .transform("compact_task",
               TypeInformation.of(CompactionCommitEvent.class),
               new ProcessOperator<>(new CompactFunction(conf)))
           .setParallelism(compactionParallelism)
-          .addSink(new CompactionCommitSink(conf))
-          .name("clean_commits")
-          .uid("uid_clean_commits")
+          .process(new ProcessFunction<CompactionCommitEvent, Void>() {
+            @Override
+            public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
+              context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
+            }
+          })
+          .name("group_by_compaction_plan")
+          .uid("uid_group_by_compaction_plan")
           .setParallelism(1);
 
-      env.execute("flink_hudi_compaction_" + compactionInstantTime);
+      compactionPlans.forEach(pair ->
+          operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
+              .addSink(new CompactionCommitSink(conf))
+              .name("clean_commits " + pair.getLeft())
+              .uid("uid_clean_commits_" + pair.getLeft())
+              .setParallelism(1));
+
+      env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
     }
 
     /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
new file mode 100644
index 0000000000..8a8c3f6b4e
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi compaction source function.
+ *
+ * <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
+ * event {@link CompactionPlanEvent} to downstream operators.
+ *
+ * <p>The compaction instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ *   <li>If the timeline has no inflight instants,
+ *   use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ *   as the instant time;</li>
+ *   <li>If the timeline has inflight instants,
+ *   use the median instant time between [last complete instant time, earliest inflight instant time]
+ *   as the instant time.</li>
+ * </ul>
+ */
+public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class);
+
+  /**
+   * compaction plan instant -> compaction plan
+   */
+  private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
+
+  public MultiCompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
+    this.compactionPlans = compactionPlans;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void run(SourceContext sourceContext) throws Exception {
+    for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
+      HoodieCompactionPlan compactionPlan = pair.getRight();
+      List<CompactionOperation> operations = compactionPlan.getOperations().stream()
+          .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+      LOG.info("CompactionPlanFunction compacting " + operations + " files");
+      for (CompactionOperation operation : operations) {
+        sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
+      }
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void cancel() {
+    // no operation
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..23b6708ff3
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+
+/**
+ * Select all pending compaction plan to compact
+ */
+public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+  @Override
+  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+    return pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..a41fcef198
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+
+/**
+ * CompactionRangeStrategy
+ */
+public interface CompactionPlanSelectStrategy {
+  /**
+   * Define how to select compaction plan to compact
+   */
+  List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config);
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..45382b70c4
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Specify the compaction plan instant to compact
+ */
+public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+  protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
+
+  @Override
+  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+    if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
+      LOG.warn("None instant is selected");
+      return Collections.emptyList();
+    }
+    List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
+    return pendingCompactionTimeline.getInstants()
+        .filter(instant -> instants.contains(instant.getTimestamp()))
+        .collect(Collectors.toList());
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..ee0e93653f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.util.CompactionUtil;
+
+/**
+ * Select multi compaction plan to compact
+ */
+public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+  @Override
+  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+    List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+    if (CompactionUtil.isLIFO(config.compactionSeq)) {
+      Collections.reverse(pendingCompactionPlanInstants);
+    }
+    int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size());
+    return pendingCompactionPlanInstants.subList(0, range);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..7ca939866c
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.util.CompactionUtil;
+
+/**
+ * Select one compaction plan to compact
+ */
+public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
+  @Override
+  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
+    Option<HoodieInstant> compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq)
+        ? pendingCompactionTimeline.lastInstant()
+        : pendingCompactionTimeline.firstInstant();
+    if (compactionPlanInstant.isPresent()) {
+      return Collections.singletonList(compactionPlanInstant.get());
+    }
+    return Collections.emptyList();
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index e1e86ce32b..43e4ed5114 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -18,6 +18,11 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -25,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
@@ -49,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor {
 
   private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
 
+  private static final Map<String, List<String>> EXPECTED3 = new HashMap<>();
+
   static {
     EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
     EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
@@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor {
     EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
     EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
     EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
+
+    EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
+    EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
+    EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
+    EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
+    EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5"));
   }
 
   @TempDir
@@ -203,4 +218,106 @@ public class ITTestHoodieFlinkCompactor {
 
     TestData.checkWrittenFullData(tempFile, EXPECTED2);
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    TimeUnit.SECONDS.sleep(3);
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+    CompactionUtil.setAvroSchema(conf, metaClient);
+    CompactionUtil.inferChangelogMode(conf, metaClient);
+
+    List<String> compactionInstantTimeList = new ArrayList<>(2);
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+
+    compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
+
+    // insert a new record to new partition, so that we can generate a new compaction plan
+    String insertT1ForNewPartition = "insert into t1 values\n"
+        + "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n"
+        + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
+    tableEnv.executeSql(insertT1ForNewPartition).await();
+
+    TimeUnit.SECONDS.sleep(3);
+
+    compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    List<Pair<String, HoodieCompactionPlan>> compactionPlans = new ArrayList<>(2);
+    for (String compactionInstantTime : compactionInstantTimeList) {
+      HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime);
+      compactionPlans.add(Pair.of(compactionInstantTime, plan));
+    }
+
+    // Mark instant as compaction inflight
+    for (String compactionInstantTime : compactionInstantTimeList) {
+      HoodieInstant hoodieInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+      table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant);
+    }
+    table.getMetaClient().reloadActiveTimeline();
+
+    DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
+        .name("compaction_source")
+        .uid("uid_compaction_source");
+    SingleOutputStreamOperator<Void> operator = source.rebalance()
+        .transform("compact_task",
+            TypeInformation.of(CompactionCommitEvent.class),
+            new ProcessOperator<>(new CompactFunction(conf)))
+        .setParallelism(1)
+        .process(new ProcessFunction<CompactionCommitEvent, Void>() {
+          @Override
+          public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
+            context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
+          }
+        })
+        .name("group_by_compaction_plan")
+        .uid("uid_group_by_compaction_plan")
+        .setParallelism(1);
+    compactionPlans.forEach(pair ->
+        operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
+            .addSink(new CompactionCommitSink(conf))
+            .name("clean_commits " + pair.getLeft())
+            .uid("uid_clean_commits_" + pair.getLeft())
+            .setParallelism(1));
+
+    env.execute("flink_hudi_compaction");
+    writeClient.close();
+    TestData.checkWrittenFullData(tempFile, EXPECTED3);
+  }
+
+  private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
+    boolean scheduled = false;
+    // judge whether have operation
+    // To compute the compaction instant time and do compaction.
+    Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
+    if (compactionInstantTimeOption.isPresent()) {
+      scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
+    }
+    assertTrue(scheduled, "The compaction plan should be scheduled");
+    return compactionInstantTimeOption.get();
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
new file mode 100644
index 0000000000..3ac9f6c666
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test case for every {@link CompactionPlanSelectStrategy} implements
+ */
+public class TestCompactionPlanSelectStrategy {
+  private HoodieTimeline timeline;
+  private HoodieTimeline emptyTimeline;
+  private HoodieTimeline allCompleteTimeline;
+
+  private static final HoodieInstant INSTANT_001 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+  private static final HoodieInstant INSTANT_002 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "002");
+  private static final HoodieInstant INSTANT_003 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "003");
+  private static final HoodieInstant INSTANT_004 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "004");
+  private static final HoodieInstant INSTANT_005 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "005");
+  private static final HoodieInstant INSTANT_006 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "006");
+
+  @BeforeEach
+  public void beforeEach() {
+    timeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_005, INSTANT_006);
+    emptyTimeline = new MockHoodieActiveTimeline();
+    allCompleteTimeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_005);
+  }
+
+  @Test
+  void testSingleCompactionPlanSelectStrategy() {
+    HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+    FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+
+    SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy();
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+  }
+
+  @Test
+  void testMultiCompactionPlanSelectStrategy() {
+    HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+    FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+    compactionConfig.compactionPlanMaxSelect = 2;
+
+    MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy();
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+  }
+
+  @Test
+  void testAllPendingCompactionPlanSelectStrategy() {
+    HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+    FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+
+    AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy();
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006},
+        strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+
+    HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+  }
+
+  @Test
+  void testInstantCompactionPlanSelectStrategy() {
+    HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
+    FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+    compactionConfig.compactionPlanInstant = "004";
+
+    InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy();
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    compactionConfig.compactionPlanInstant = "002,003";
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    compactionConfig.compactionPlanInstant = "002,005";
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+
+    compactionConfig.compactionPlanInstant = "005";
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig));
+  }
+
+  private void assertHoodieInstantsEquals(HoodieInstant[] expected, List<HoodieInstant> actual) {
+    assertEquals(expected.length, actual.size());
+    for (int index = 0; index < expected.length; index++) {
+      assertHoodieInstantEquals(expected[index], actual.get(index));
+    }
+  }
+
+  private void assertHoodieInstantEquals(HoodieInstant expected, HoodieInstant actual) {
+    assertEquals(expected.getState(), actual.getState());
+    assertEquals(expected.getAction(), actual.getAction());
+    assertEquals(expected.getTimestamp(), actual.getTimestamp());
+  }
+
+  private static final class MockHoodieActiveTimeline extends HoodieActiveTimeline {
+    public MockHoodieActiveTimeline(HoodieInstant... instants) {
+      super();
+      setInstants(Arrays.asList(instants));
+    }
+  }
+}