You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/08 01:56:15 UTC

[hudi] branch master updated: [minor] following 4152, refactor the clazz about plan selection strategy (#6060)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a998586396 [minor] following 4152, refactor the clazz about plan selection strategy (#6060)
a998586396 is described below

commit a9985863967052783bad0e0a4381c9f602c31280
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jul 8 09:56:10 2022 +0800

    [minor] following 4152, refactor the clazz about plan selection strategy (#6060)
---
 .../sink/compact/CompactionPlanSourceFunction.java | 31 ++++----
 .../hudi/sink/compact/FlinkCompactionConfig.java   | 26 +++----
 .../hudi/sink/compact/HoodieFlinkCompactor.java    | 53 ++++---------
 .../compact/MultiCompactionPlanSourceFunction.java | 90 ----------------------
 .../AllPendingCompactionPlanSelectStrategy.java    | 35 ---------
 .../compact/strategy/CompactionPlanStrategies.java | 74 ++++++++++++++++++
 ...ctStrategy.java => CompactionPlanStrategy.java} | 16 ++--
 .../InstantCompactionPlanSelectStrategy.java       | 50 ------------
 .../MultiCompactionPlanSelectStrategy.java         | 42 ----------
 .../SingleCompactionPlanSelectStrategy.java        | 43 -----------
 .../sink/compact/ITTestHoodieFlinkCompactor.java   | 48 +++---------
 ...rategy.java => TestCompactionPlanStrategy.java} | 59 +++++++-------
 12 files changed, 168 insertions(+), 399 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
index fe55089988..883ba8bd11 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
@@ -28,8 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
 
 /**
  * Flink hudi compaction source function.
@@ -53,18 +53,12 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
   protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);
 
   /**
-   * Compaction instant time.
-   */
-  private final String compactionInstantTime;
-
-  /**
-   * The compaction plan.
+   * compaction plan instant -> compaction plan
    */
-  private final HoodieCompactionPlan compactionPlan;
+  private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
 
-  public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
-    this.compactionPlan = compactionPlan;
-    this.compactionInstantTime = compactionInstantTime;
+  public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
+    this.compactionPlans = compactionPlans;
   }
 
   @Override
@@ -74,11 +68,14 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
 
   @Override
   public void run(SourceContext sourceContext) throws Exception {
-    List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
-        .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
-    LOG.info("CompactionPlanFunction compacting " + operations + " files");
-    for (CompactionOperation operation : operations) {
-      sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation));
+    for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
+      HoodieCompactionPlan compactionPlan = pair.getRight();
+      List<CompactionOperation> operations = compactionPlan.getOperations().stream()
+          .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
+      LOG.info("CompactionPlanFunction compacting " + operations + " files");
+      for (CompactionOperation operation : operations) {
+        sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
+      }
     }
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 02041690f1..449b068461 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -20,10 +20,10 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
 
 import com.beust.jcommander.Parameter;
 import org.apache.flink.configuration.Configuration;
-import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
 
 /**
  * Configurations for Hoodie Flink compaction.
@@ -102,7 +102,7 @@ public class FlinkCompactionConfig extends Configuration {
   @Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
       + "1). FIFO: execute the oldest plan first;\n"
       + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
-  public String compactionSeq = SEQ_LIFO;
+  public String compactionSeq = SEQ_FIFO;
 
   @Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
   public Boolean serviceMode = false;
@@ -111,21 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
       description = "Min compaction interval of async compaction service, default 10 minutes")
   public Integer minCompactionIntervalSeconds = 600;
 
-  @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
-      + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
-      + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
-      + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
-      + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
-  public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
+  @Parameter(names = {"--plan-select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+      + "1). num_instants: select plans by specific number of instants, it's the default strategy with 1 instant at a time;\n"
+      + "3). all: Select all pending compaction plan;\n"
+      + "4). instants: Select the compaction plan by specific instants")
+  public String compactionPlanSelectStrategy = CompactionPlanStrategy.NUM_INSTANTS;
 
-  @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
+  @Parameter(names = {"--max-num-plans"}, description = "Max number of compaction plan would be selected in compaction."
       + "It's only effective for MultiCompactionPlanSelectStrategy.")
-  public Integer compactionPlanMaxSelect = 10;
+  public Integer maxNumCompactionPlans = 1;
 
-  @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
-      + "one instant in a time by using comma."
-      + "It's only effective for InstantCompactionPlanSelectStrategy.")
+  @Parameter(names = {"--target-instants"}, description = "Specify the compaction plan instants to compact,\n"
+      + "Multiple instants are supported by comma separated instant time.\n"
+      + "It's only effective for 'instants' plan selection strategy.")
   public String compactionPlanInstant;
+
   @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
   public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index f56b5a2f0f..e2d2972a0d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.sink.compact;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 import org.apache.hudi.async.HoodieAsyncTableService;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -31,11 +26,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
@@ -228,9 +222,8 @@ public class HoodieFlinkCompactor {
       }
 
       // fetch the instant based on the configured execution sequence
-      HoodieTimeline timeline = table.getActiveTimeline();
-      List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
-          .select(timeline.filterPendingCompactionTimeline(), cfg);
+      HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
+      List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline);
       if (requested.isEmpty()) {
         // do nothing.
         LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
@@ -240,7 +233,7 @@ public class HoodieFlinkCompactor {
       List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
       compactionInstantTimes.forEach(timestamp -> {
         HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
-        if (timeline.containsInstant(inflightInstant)) {
+        if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
           LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
           table.rollbackInflightCompaction(inflightInstant);
           table.getMetaClient().reloadActiveTimeline();
@@ -254,13 +247,11 @@ public class HoodieFlinkCompactor {
             try {
               return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
             } catch (IOException e) {
-              throw new HoodieException(e);
+              throw new HoodieException("Get compaction plan at instant " + timestamp + " error", e);
             }
           })
           // reject empty compaction plan
-          .filter(pair -> !(pair.getRight() == null
-              || pair.getRight().getOperations() == null
-              || pair.getRight().getOperations().isEmpty()))
+          .filter(pair -> validCompactionPlan(pair.getRight()))
           .collect(Collectors.toList());
 
       if (compactionPlans.isEmpty()) {
@@ -270,7 +261,6 @@ public class HoodieFlinkCompactor {
       }
 
       List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
-      HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
       for (HoodieInstant instant : instants) {
         if (!pendingCompactionTimeline.containsInstant(instant)) {
           // this means that the compaction plan was written to auxiliary path(.tmp)
@@ -297,34 +287,19 @@ public class HoodieFlinkCompactor {
       }
       table.getMetaClient().reloadActiveTimeline();
 
-      // use side-output to make operations that is in the same plan to be placed in the same stream
-      // keyby() cannot sure that different operations are in the different stream
-      DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
+      env.addSource(new CompactionPlanSourceFunction(compactionPlans))
           .name("compaction_source")
-          .uid("uid_compaction_source");
-
-      SingleOutputStreamOperator<Void> operator = source.rebalance()
+          .uid("uid_compaction_source")
+          .rebalance()
           .transform("compact_task",
               TypeInformation.of(CompactionCommitEvent.class),
               new ProcessOperator<>(new CompactFunction(conf)))
           .setParallelism(compactionParallelism)
-          .process(new ProcessFunction<CompactionCommitEvent, Void>() {
-            @Override
-            public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
-              context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
-            }
-          })
-          .name("group_by_compaction_plan")
-          .uid("uid_group_by_compaction_plan")
+          .addSink(new CompactionCommitSink(conf))
+          .name("compaction_commit")
+          .uid("uid_compaction_commit")
           .setParallelism(1);
 
-      compactionPlans.forEach(pair ->
-          operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
-              .addSink(new CompactionCommitSink(conf))
-              .name("clean_commits " + pair.getLeft())
-              .uid("uid_clean_commits_" + pair.getLeft())
-              .setParallelism(1));
-
       env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
     }
 
@@ -342,4 +317,8 @@ public class HoodieFlinkCompactor {
       shutdownAsyncService(false);
     }
   }
+
+  private static boolean validCompactionPlan(HoodieCompactionPlan plan) {
+    return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0;
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
deleted file mode 100644
index 8a8c3f6b4e..0000000000
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.compact;
-
-import static java.util.stream.Collectors.toList;
-
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.util.collection.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flink hudi compaction source function.
- *
- * <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
- * event {@link CompactionPlanEvent} to downstream operators.
- *
- * <p>The compaction instant time is specified explicitly with strategies:
- *
- * <ul>
- *   <li>If the timeline has no inflight instants,
- *   use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
- *   as the instant time;</li>
- *   <li>If the timeline has inflight instants,
- *   use the median instant time between [last complete instant time, earliest inflight instant time]
- *   as the instant time.</li>
- * </ul>
- */
-public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class);
-
-  /**
-   * compaction plan instant -> compaction plan
-   */
-  private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
-
-  public MultiCompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
-    this.compactionPlans = compactionPlans;
-  }
-
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    // no operation
-  }
-
-  @Override
-  public void run(SourceContext sourceContext) throws Exception {
-    for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
-      HoodieCompactionPlan compactionPlan = pair.getRight();
-      List<CompactionOperation> operations = compactionPlan.getOperations().stream()
-          .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
-      LOG.info("CompactionPlanFunction compacting " + operations + " files");
-      for (CompactionOperation operation : operations) {
-        sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
-      }
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    // no operation
-  }
-
-  @Override
-  public void cancel() {
-    // no operation
-  }
-}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
deleted file mode 100644
index 23b6708ff3..0000000000
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.compact.strategy;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-
-/**
- * Select all pending compaction plan to compact
- */
-public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
-  @Override
-  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
-    return pendingCompactionTimeline.getInstants().collect(Collectors.toList());
-  }
-}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java
new file mode 100644
index 0000000000..662dcabda3
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.strategy;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+import org.apache.hudi.util.CompactionUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+/**
+ * Factory clazz for CompactionPlanStrategy.
+ */
+public class CompactionPlanStrategies {
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionPlanStrategies.class);
+
+  private CompactionPlanStrategies() {
+  }
+
+  public static CompactionPlanStrategy getStrategy(FlinkCompactionConfig config) {
+    switch (config.compactionPlanSelectStrategy.toLowerCase(Locale.ROOT)) {
+      case CompactionPlanStrategy.ALL:
+        return pendingCompactionTimeline -> pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+      case CompactionPlanStrategy.INSTANTS:
+        return pendingCompactionTimeline -> {
+          if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
+            LOG.warn("None instant is selected");
+            return Collections.emptyList();
+          }
+          List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
+          return pendingCompactionTimeline.getInstants()
+              .filter(instant -> instants.contains(instant.getTimestamp()))
+              .collect(Collectors.toList());
+        };
+      case CompactionPlanStrategy.NUM_INSTANTS:
+        return pendingCompactionTimeline -> {
+          List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
+          if (CompactionUtil.isLIFO(config.compactionSeq)) {
+            Collections.reverse(pendingCompactionPlanInstants);
+          }
+          int range = Math.min(config.maxNumCompactionPlans, pendingCompactionPlanInstants.size());
+          return pendingCompactionPlanInstants.subList(0, range);
+        };
+      default:
+        throw new UnsupportedOperationException("Unknown compaction plan strategy: "
+            + config.compactionPlanSelectStrategy
+            + ", supported strategies:[num_instants,instants,all]");
+    }
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java
similarity index 81%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java
index a41fcef198..e209ff5339 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java
@@ -18,17 +18,21 @@
 
 package org.apache.hudi.sink.compact.strategy;
 
-import java.util.List;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
+
+import java.util.List;
 
 /**
- * CompactionRangeStrategy
+ * Compaction plan selection strategy.
  */
-public interface CompactionPlanSelectStrategy {
+public interface CompactionPlanStrategy {
+  String ALL = "all";
+  String INSTANTS = "instants";
+  String NUM_INSTANTS = "num_instants";
+
   /**
-   * Define how to select compaction plan to compact
+   * Define how to select compaction plan to compact.
    */
-  List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config);
+  List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline);
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
deleted file mode 100644
index 45382b70c4..0000000000
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.compact.strategy;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Specify the compaction plan instant to compact
- */
-public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
-  protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
-
-  @Override
-  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
-    if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
-      LOG.warn("None instant is selected");
-      return Collections.emptyList();
-    }
-    List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
-    return pendingCompactionTimeline.getInstants()
-        .filter(instant -> instants.contains(instant.getTimestamp()))
-        .collect(Collectors.toList());
-  }
-}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
deleted file mode 100644
index ee0e93653f..0000000000
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.compact.strategy;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-import org.apache.hudi.util.CompactionUtil;
-
-/**
- * Select multi compaction plan to compact
- */
-public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
-  @Override
-  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
-    List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
-    if (CompactionUtil.isLIFO(config.compactionSeq)) {
-      Collections.reverse(pendingCompactionPlanInstants);
-    }
-    int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size());
-    return pendingCompactionPlanInstants.subList(0, range);
-  }
-}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
deleted file mode 100644
index 7ca939866c..0000000000
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.compact.strategy;
-
-import java.util.Collections;
-import java.util.List;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
-import org.apache.hudi.util.CompactionUtil;
-
-/**
- * Select one compaction plan to compact
- */
-public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
-  @Override
-  public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
-    Option<HoodieInstant> compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq)
-        ? pendingCompactionTimeline.lastInstant()
-        : pendingCompactionTimeline.firstInstant();
-    if (compactionPlanInstant.isPresent()) {
-      return Collections.singletonList(compactionPlanInstant.get());
-    }
-    return Collections.emptyList();
-  }
-}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 43e4ed5114..341a157e86 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.sink.compact;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -54,8 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -138,17 +134,7 @@ public class ITTestHoodieFlinkCompactor {
 
     HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
 
-    boolean scheduled = false;
-    // judge whether have operation
-    // To compute the compaction instant time and do compaction.
-    Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
-    if (compactionInstantTimeOption.isPresent()) {
-      scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
-    }
-
-    String compactionInstantTime = compactionInstantTimeOption.get();
-
-    assertTrue(scheduled, "The compaction plan should be scheduled");
+    String compactionInstantTime = scheduleCompactionPlan(metaClient, writeClient);
 
     HoodieFlinkTable<?> table = writeClient.getHoodieTable();
     // generate compaction plan
@@ -160,7 +146,7 @@ public class ITTestHoodieFlinkCompactor {
     // Mark instant as compaction inflight
     table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
 
-    env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
+    env.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime, compactionPlan))))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
@@ -280,29 +266,18 @@ public class ITTestHoodieFlinkCompactor {
     }
     table.getMetaClient().reloadActiveTimeline();
 
-    DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
+    env.addSource(new CompactionPlanSourceFunction(compactionPlans))
         .name("compaction_source")
-        .uid("uid_compaction_source");
-    SingleOutputStreamOperator<Void> operator = source.rebalance()
+        .uid("uid_compaction_source")
+        .rebalance()
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
             new ProcessOperator<>(new CompactFunction(conf)))
         .setParallelism(1)
-        .process(new ProcessFunction<CompactionCommitEvent, Void>() {
-          @Override
-          public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
-            context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
-          }
-        })
-        .name("group_by_compaction_plan")
-        .uid("uid_group_by_compaction_plan")
+        .addSink(new CompactionCommitSink(conf))
+        .name("compaction_commit")
+        .uid("uid_compaction_commit")
         .setParallelism(1);
-    compactionPlans.forEach(pair ->
-        operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
-            .addSink(new CompactionCommitSink(conf))
-            .name("clean_commits " + pair.getLeft())
-            .uid("uid_clean_commits_" + pair.getLeft())
-            .setParallelism(1));
 
     env.execute("flink_hudi_compaction");
     writeClient.close();
@@ -311,8 +286,7 @@ public class ITTestHoodieFlinkCompactor {
 
   private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
     boolean scheduled = false;
-    // judge whether have operation
-    // To compute the compaction instant time and do compaction.
+    // judge whether there are any compaction operations.
     Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
     if (compactionInstantTimeOption.isPresent()) {
       scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java
similarity index 74%
rename from hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
rename to hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java
index 3ac9f6c666..181be26d6d 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java
@@ -25,18 +25,16 @@ import java.util.List;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy;
-import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
-import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy;
-import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy;
-import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
+import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Test case for every {@link CompactionPlanSelectStrategy} implements
+ * Test case for every {@link CompactionPlanStrategy} implements
  */
-public class TestCompactionPlanSelectStrategy {
+public class TestCompactionPlanStrategy {
   private HoodieTimeline timeline;
   private HoodieTimeline emptyTimeline;
   private HoodieTimeline allCompleteTimeline;
@@ -59,72 +57,75 @@ public class TestCompactionPlanSelectStrategy {
   void testSingleCompactionPlanSelectStrategy() {
     HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
     FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+    CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig);
 
-    SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy();
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline));
 
-    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_LIFO;
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline));
 
     HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline));
 
     HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline));
   }
 
   @Test
   void testMultiCompactionPlanSelectStrategy() {
     HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
     FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
-    compactionConfig.compactionPlanMaxSelect = 2;
+    compactionConfig.maxNumCompactionPlans = 2;
 
-    MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy();
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig);
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline));
 
-    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_LIFO;
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline));
 
     HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline));
 
     HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline));
   }
 
   @Test
   void testAllPendingCompactionPlanSelectStrategy() {
     HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
     FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+    compactionConfig.compactionPlanSelectStrategy = CompactionPlanStrategy.ALL;
+    CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig);
 
-    AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy();
     assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006},
-        strategy.select(pendingCompactionTimeline, compactionConfig));
+        strategy.select(pendingCompactionTimeline));
 
     HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline));
 
     HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline));
   }
 
   @Test
   void testInstantCompactionPlanSelectStrategy() {
     HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
     FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
+
+    compactionConfig.compactionPlanSelectStrategy = CompactionPlanStrategy.INSTANTS;
+    CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig);
     compactionConfig.compactionPlanInstant = "004";
 
-    InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy();
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline));
 
     compactionConfig.compactionPlanInstant = "002,003";
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline));
 
     compactionConfig.compactionPlanInstant = "002,005";
-    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline));
 
     compactionConfig.compactionPlanInstant = "005";
-    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig));
+    assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline));
   }
 
   private void assertHoodieInstantsEquals(HoodieInstant[] expected, List<HoodieInstant> actual) {