You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/05/24 16:47:36 UTC

[hudi] branch master updated: [HUDI-2207] Support independent flink hudi clustering function

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c20db99a7b [HUDI-2207] Support independent flink hudi clustering function
     new 18635b533e Merge pull request #3599 from yuzhaojing/HUDI-2207
c20db99a7b is described below

commit c20db99a7b79b9545e6fae0d762250ceb55a8b79
Author: 喻兆靖 <yu...@bytedance.com>
AuthorDate: Sat May 21 21:25:15 2022 +0800

    [HUDI-2207] Support independent flink hudi clustering function
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../apache/hudi/config/HoodieClusteringConfig.java |   2 +
 .../apache/hudi/table/BulkInsertPartitioner.java   |  16 +-
 .../cluster/strategy/ClusteringPlanStrategy.java   |  11 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  68 +++++
 .../FlinkRecentDaysClusteringPlanStrategy.java     |  65 +++++
 ...nkSelectedPartitionsClusteringPlanStrategy.java |  67 +++++
 .../FlinkSizeBasedClusteringPlanStrategy.java      | 129 +++++++++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   3 +-
 .../JavaCustomColumnsSortPartitioner.java          |   2 +-
 .../bulkinsert/JavaGlobalSortPartitioner.java      |   2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  67 +++++
 .../hudi/sink/bulk/sort/SortOperatorGen.java       |   2 +-
 .../sink/clustering/ClusteringCommitEvent.java     |  77 +++++
 .../hudi/sink/clustering/ClusteringCommitSink.java | 174 +++++++++++
 .../hudi/sink/clustering/ClusteringOperator.java   | 318 +++++++++++++++++++++
 .../hudi/sink/clustering/ClusteringPlanEvent.java  |  73 +++++
 .../clustering/ClusteringPlanSourceFunction.java   |  91 ++++++
 .../sink/clustering/FlinkClusteringConfig.java     | 148 ++++++++++
 .../sink/clustering/HoodieFlinkClusteringJob.java  | 191 +++++++++++++
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  20 +-
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 184 ++++++++++++
 23 files changed, 1700 insertions(+), 14 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 270027df18..251ff97799 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1379,7 +1379,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     return scheduleClustering(extraMetadata);
   }
 
-  protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
+  public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
     Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
     String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
     table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index eee6f4f492..1180845a6e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
   public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
   public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
+  public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+      "org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
   public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
       "org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
   public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 63b502531a..89360c2474 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -25,20 +25,20 @@ import org.apache.hudi.io.WriteHandleFactory;
 import java.io.Serializable;
 
 /**
- * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
- * Output spark partition will have records from only one hoodie partition. - Average records per output spark
- * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
+ * Repartition input records into at least expected number of output partitions. It should give below guarantees -
+ * Output partition will have records from only one hoodie partition. - Average records per output
+ * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
  */
 public interface BulkInsertPartitioner<I> extends Serializable {
 
   /**
-   * Repartitions the input records into at least expected number of output spark partitions.
+   * Repartitions the input records into at least expected number of output partitions.
    *
-   * @param records               Input Hoodie records
-   * @param outputSparkPartitions Expected number of output partitions
+   * @param records          Input Hoodie records
+   * @param outputPartitions Expected number of output partitions
    * @return
    */
-  I repartitionRecords(I records, int outputSparkPartitions);
+  I repartitionRecords(I records, int outputPartitions);
 
   /**
    * @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
@@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
   /**
    * Return file group id prefix for the given data partition.
    * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
+   *
    * @param partitionId data partition
    * @return
    */
@@ -57,6 +58,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
 
   /**
    * Return write handle factory for the given partition.
+   *
    * @param partitionId data partition
    * @return
    */
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 479f63932c..a96ff73947 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -70,6 +70,9 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
     String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
     String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
     String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+    String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+    String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
+    String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
     String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
     String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
 
@@ -82,6 +85,14 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
       config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
       LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
       return sparkSizeBasedClassName;
+    } else if (flinkRecentDaysClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
+      return flinkSizeBasedClassName;
+    } else if (flinkSelectedPartitionsClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+      return flinkSizeBasedClassName;
     } else if (javaSelectedPartitionClassName.equals(className)) {
       config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
       LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 49fa2ec246..ddfbabaf36 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
@@ -68,6 +71,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -399,6 +404,52 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     throw new HoodieNotSupportedException("Clustering is not supported yet");
   }
 
+  private void completeClustering(
+      HoodieReplaceCommitMetadata metadata,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+      String clusteringCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
+    HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+    List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+        e.getValue().stream()).collect(Collectors.toList());
+    if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+      throw new HoodieClusteringException("Clustering failed to write to files:"
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+    }
+
+    try {
+      this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+      finalizeWrite(table, clusteringCommitTime, writeStats);
+      // commit to data table after committing to metadata table.
+      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+      writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
+      LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
+      table.getActiveTimeline().transitionReplaceInflightToComplete(
+          HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    } catch (IOException e) {
+      throw new HoodieClusteringException(
+          "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
+    } finally {
+      this.txnManager.endTransaction(Option.of(clusteringInstant));
+    }
+
+    WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+    if (clusteringTimer != null) {
+      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+      try {
+        metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
+            durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      } catch (ParseException e) {
+        throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+            + config.getBasePath() + " at time " + clusteringCommitTime, e);
+      }
+    }
+    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+  }
+
   @Override
   protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
     // Create a Hoodie table which encapsulated the commits and files visible
@@ -412,6 +463,23 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     // no need to execute the upgrade/downgrade on each write in streaming.
   }
 
+  public void completeTableService(
+      TableServiceType tableServiceType,
+      HoodieCommitMetadata metadata,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+      String commitInstant) {
+    switch (tableServiceType) {
+      case CLUSTER:
+        completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
+        break;
+      case COMPACT:
+        completeCompaction(metadata, table, commitInstant);
+        break;
+      default:
+        throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
+    }
+  }
+
   /**
    * Upgrade downgrade the Hoodie table.
    *
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
new file mode 100644
index 0000000000..0109aaa60f
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Only looks at latest 'daybased.lookback.partitions' partitions.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
+ */
+public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+    extends FlinkSizeBasedClusteringPlanStrategy<T> {
+  private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
+
+  public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+                                               HoodieFlinkEngineContext engineContext,
+                                               HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+                                               HoodieFlinkEngineContext engineContext,
+                                               HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
+    int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
+    return partitionPaths.stream()
+        .sorted(Comparator.reverseOrder())
+        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+        .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
+        .collect(Collectors.toList());
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
new file mode 100644
index 0000000000..ae5726bb4a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
+
+/**
+ * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
+ */
+public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+    extends FlinkSizeBasedClusteringPlanStrategy<T> {
+  private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);
+
+  public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
+  public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
+  
+  public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+                                                       HoodieFlinkEngineContext engineContext,
+                                                       HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+                                                       HoodieFlinkEngineContext engineContext,
+                                                       HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
+    String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
+    List<String> filteredPartitions = partitionPaths.stream()
+        .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
+        .collect(Collectors.toList());
+    LOG.info("Filtered to the following partitions: " + filteredPartitions);
+    return filteredPartitions;
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 0000000000..8347da6014
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Creates clustering groups based on max size allowed per group.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
+ */
+public class FlinkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+    extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+  private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
+
+  public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+                                              HoodieFlinkEngineContext engineContext,
+                                              HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+                                              HoodieFlinkEngineContext engineContext,
+                                              HoodieWriteConfig writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+    long totalSizeSoFar = 0;
+
+    for (FileSlice currentSlice : fileSlices) {
+      // check if max size is reached and create new group, if needed.
+      // in now, every clustering group out put is 1 file group.
+      if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && !currentGroup.isEmpty()) {
+        LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size());
+        fileSliceGroups.add(Pair.of(currentGroup, 1));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+      }
+
+      // Add to the current file-group
+      currentGroup.add(currentSlice);
+      // assume each file group size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
+    }
+
+    if (!currentGroup.isEmpty()) {
+      fileSliceGroups.add(Pair.of(currentGroup, 1));
+    }
+
+    return fileSliceGroups.stream().map(fileSliceGroup ->
+        HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+            .setNumOutputFileGroups(fileSliceGroup.getRight())
+            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+            .build());
+  }
+
+  @Override
+  protected Map<String, String> getStrategyParams() {
+    Map<String, String> params = new HashMap<>();
+    if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+      params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns());
+    }
+    return params;
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    return partitionPaths;
+  }
+
+  @Override
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
+    return super.getFileSlicesEligibleForClustering(partition)
+        // Only files that have basefile size smaller than small file size are eligible.
+        .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
+  }
+
+  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
+    return (int) Math.ceil(groupSize / (double) targetFileSize);
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9ab633f9e3..0e5f1c26e3 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -55,6 +55,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
+import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -286,7 +287,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
 
   @Override
   public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
-    throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table");
+    return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index eb3d4ef312..b9e466485f 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -49,7 +49,7 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
 
   @Override
   public List<HoodieRecord<T>> repartitionRecords(
-      List<HoodieRecord<T>> records, int outputSparkPartitions) {
+      List<HoodieRecord<T>> records, int outputPartitions) {
     return records.stream().sorted((o1, o2) -> {
       Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
       Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
index fded0ffab5..d272849a19 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
@@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
 
   @Override
   public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
-                                                  int outputSparkPartitions) {
+                                                  int outputPartitions) {
     // Now, sort the records and line them up nicely for loading.
     records.sort(new Comparator() {
       @Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 729f0147b5..3de4bd4f75 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.configuration;
 
+import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
 import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
@@ -583,6 +584,72 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(40)// default min 40 commits
       .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40");
 
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+
+  public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = ConfigOptions
+      .key("clustering.schedule.enabled")
+      .booleanType()
+      .defaultValue(false) // default false for pipeline
+      .withDescription("Schedule the cluster plan, default false");
+
+  public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
+      .key("clustering.delta_commits")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Max delta commits needed to trigger clustering, default 4 commits");
+
+  public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+      .key("clustering.tasks")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do actual clustering, default is 4");
+
+  public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
+      .key("clustering.plan.strategy.daybased.lookback.partitions")
+      .intType()
+      .defaultValue(2)
+      .withDescription("Number of partitions to list to create ClusteringPlan, default is 2");
+
+  public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
+      .key("clustering.plan.strategy.class")
+      .stringType()
+      .defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
+      .withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+          + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+          + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
+
+  public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+      .key("clustering.plan.strategy.target.file.max.bytes")
+      .intType()
+      .defaultValue(1024 * 1024 * 1024) // default 1 GB
+      .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
+
+  public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+      .key("clustering.plan.strategy.small.file.limit")
+      .intType()
+      .defaultValue(600) // default 600 MB
+      .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
+
+  public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
+      .key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
+      .intType()
+      .defaultValue(0)
+      .withDescription("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
+
+  public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
+      .key("clustering.plan.strategy.sort.columns")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Columns to sort the data by when clustering");
+
+  public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions
+      .key("clustering.plan.strategy.max.num.groups")
+      .intType()
+      .defaultValue(30)
+      .withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30");
+
   // ------------------------------------------------------------------------
   //  Hive Sync Options
   // ------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
index 4d3fc08efe..b5599886a9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
@@ -48,7 +48,7 @@ public class SortOperatorGen {
         codeGen.generateRecordComparator("SortComparator"));
   }
 
-  private SortCodeGenerator createSortCodeGenerator() {
+  public SortCodeGenerator createSortCodeGenerator() {
     SortSpec.SortSpecBuilder builder = SortSpec.builder();
     IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
     return new SortCodeGenerator(tableConfig, rowType, builder.build());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
new file mode 100644
index 0000000000..30a8fbed3f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the clustering task {@link ClusteringFunction}.
+ */
+public class ClusteringCommitEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The clustering commit instant time.
+   */
+  private String instant;
+  /**
+   * The write statuses.
+   */
+  private List<WriteStatus> writeStatuses;
+  /**
+   * The clustering task identifier.
+   */
+  private int taskID;
+
+  public ClusteringCommitEvent() {
+  }
+
+  public ClusteringCommitEvent(String instant, List<WriteStatus> writeStatuses, int taskID) {
+    this.instant = instant;
+    this.writeStatuses = writeStatuses;
+    this.taskID = taskID;
+  }
+
+  public void setInstant(String instant) {
+    this.instant = instant;
+  }
+
+  public void setWriteStatuses(List<WriteStatus> writeStatuses) {
+    this.writeStatuses = writeStatuses;
+  }
+
+  public void setTaskID(int taskID) {
+    this.taskID = taskID;
+  }
+
+  public String getInstant() {
+    return instant;
+  }
+
+  public List<WriteStatus> getWriteStatuses() {
+    return writeStatuses;
+  }
+
+  public int getTaskID() {
+    return taskID;
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
new file mode 100644
index 0000000000..bc87270a49
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function to check and commit the clustering action.
+ *
+ * <p> Each time after receiving a clustering commit event {@link ClusteringCommitEvent},
+ * it loads and checks the clustering plan {@link org.apache.hudi.avro.model.HoodieClusteringPlan},
+ * if all the clustering operations {@link org.apache.hudi.common.model.ClusteringOperation}
+ * of the plan are finished, tries to commit the clustering action.
+ *
+ * <p>It also inherits the {@link CleanFunction} cleaning ability. This is needed because
+ * the SQL API does not allow multiple sinks in one table sink provider.
+ */
+public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
+  private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  private transient HoodieFlinkTable<?> table;
+
+  /**
+   * Buffer to collect the event from each clustering task {@code ClusteringFunction}.
+   * The key is the instant time.
+   */
+  private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;
+
+  public ClusteringCommitSink(Configuration conf) {
+    super(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    if (writeClient == null) {
+      this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
+    }
+    this.commitBuffer = new HashMap<>();
+    this.table = writeClient.getHoodieTable();
+  }
+
+  @Override
+  public void invoke(ClusteringCommitEvent event, Context context) throws Exception {
+    final String instant = event.getInstant();
+    commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
+        .add(event);
+    commitIfNecessary(instant, commitBuffer.get(instant));
+  }
+
+  /**
+   * Condition to commit: the commit buffer has equal size with the clustering plan operations
+   * and all the clustering commit event {@link ClusteringCommitEvent} has the same clustering instant time.
+   *
+   * @param instant Clustering commit instant time
+   * @param events  Commit events ever received for the instant
+   */
+  private void commitIfNecessary(String instant, List<ClusteringCommitEvent> events) {
+    HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(instant);
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+        StreamerUtil.createMetaClient(this.conf), clusteringInstant);
+    HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+    boolean isReady = clusteringPlan.getInputGroups().size() == events.size();
+    if (!isReady) {
+      return;
+    }
+    List<WriteStatus> statuses = events.stream()
+        .map(ClusteringCommitEvent::getWriteStatuses)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
+    writeMetadata.setWriteStatuses(statuses);
+    writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+    writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
+    validateWriteResult(clusteringPlan, instant, writeMetadata);
+    if (!writeMetadata.getCommitMetadata().isPresent()) {
+      HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
+          writeMetadata.getWriteStats().get(),
+          writeMetadata.getPartitionToReplaceFileIds(),
+          Option.empty(),
+          WriteOperationType.CLUSTER,
+          this.writeClient.getConfig().getSchema(),
+          HoodieTimeline.REPLACE_COMMIT_ACTION);
+      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
+    }
+    // commit the clustering
+    this.table.getMetaClient().reloadActiveTimeline();
+    this.writeClient.completeTableService(
+        TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);
+
+    // reset the status
+    reset(instant);
+  }
+
+  private void reset(String instant) {
+    this.commitBuffer.remove(instant);
+  }
+
+  /**
+   * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
+   * But we can extend this to add more validation. E.g. number of records read = number of records written etc.
+   * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
+   */
+  private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, String instantTime, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+    if (writeMetadata.getWriteStatuses().isEmpty()) {
+      throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+          + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+          + " write statuses");
+    }
+  }
+
+  private static Map<String, List<String>> getPartitionToReplacedFileIds(
+      HoodieClusteringPlan clusteringPlan,
+      HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+    Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
+        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
+    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+        .filter(fg -> !newFilesWritten.contains(fg))
+        .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
new file mode 100644
index 0000000000..a415ac9d46
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Operator to execute the actual clustering task assigned by the clustering plan task.
+ * In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent> implements
+    OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
+  private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
+
+  private final Configuration conf;
+  private final RowType rowType;
+  private int taskID;
+  private transient HoodieWriteConfig writeConfig;
+  private transient HoodieFlinkTable<?> table;
+  private transient Schema schema;
+  private transient Schema readerSchema;
+  private transient int[] requiredPos;
+  private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
+  private transient HoodieFlinkWriteClient writeClient;
+  private transient BulkInsertWriterHelper writerHelper;
+  private transient String instantTime;
+
+  private transient BinaryExternalSorter sorter;
+  private transient StreamRecordCollector<ClusteringCommitEvent> collector;
+  private transient BinaryRowDataSerializer binarySerializer;
+
+  public ClusteringOperator(Configuration conf, RowType rowType) {
+    this.conf = conf;
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
+    this.table = writeClient.getHoodieTable();
+
+    this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+    this.requiredPos = getRequiredPositions();
+
+    this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
+
+    ClassLoader cl = getContainingTask().getUserCodeClassLoader();
+
+    AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
+    this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
+
+    NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
+    RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
+
+    MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
+    this.sorter =
+        new BinaryExternalSorter(
+            this.getContainingTask(),
+            memManager,
+            computeMemorySize(),
+            this.getContainingTask().getEnvironment().getIOManager(),
+            inputSerializer,
+            binarySerializer,
+            computer,
+            comparator,
+            getContainingTask().getJobConfiguration());
+    this.sorter.startThreads();
+
+    collector = new StreamRecordCollector<>(output);
+
+    // register the metrics.
+    getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
+    getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
+    getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
+  }
+
+  @Override
+  public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
+    ClusteringPlanEvent event = element.getValue();
+    final String instantTime = event.getClusteringInstantTime();
+    final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
+
+    initWriterHelper(instantTime);
+
+    List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
+
+    Iterator<RowData> iterator;
+    if (hasLogFiles) {
+      // if there are log files, we read all records into memory for a file group and apply updates.
+      iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
+    } else {
+      // We want to optimize reading records for case there are no log files.
+      iterator = readRecordsForGroupBaseFiles(clusteringOps);
+    }
+
+    RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
+    while (iterator.hasNext()) {
+      RowData rowData = iterator.next();
+      BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
+      this.sorter.write(binaryRowData);
+    }
+
+    BinaryRowData row = binarySerializer.createInstance();
+    while ((row = sorter.getIterator().next(row)) != null) {
+      this.writerHelper.write(row);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.writeClient != null) {
+      this.writeClient.cleanHandlesGracefully();
+      this.writeClient.close();
+    }
+  }
+
+  /**
+   * End input action for batch source.
+   */
+  public void endInput() {
+    List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
+    collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initWriterHelper(String clusteringInstantTime) {
+    if (this.writerHelper == null) {
+      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
+          clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+      this.instantTime = clusteringInstantTime;
+    }
+  }
+
+  /**
+   * Read records from baseFiles, apply updates and convert to Iterator.
+   */
+  @SuppressWarnings("unchecked")
+  private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
+    List<Iterator<RowData>> recordIterators = new ArrayList<>();
+
+    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), writeConfig);
+    LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
+
+    for (ClusteringOperation clusteringOp : clusteringOps) {
+      try {
+        Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+            ? Option.empty()
+            : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
+        HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+            .withFileSystem(table.getMetaClient().getFs())
+            .withBasePath(table.getMetaClient().getBasePath())
+            .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+            .withReaderSchema(readerSchema)
+            .withLatestInstantTime(instantTime)
+            .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+            .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
+            .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
+            .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
+            .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+            .build();
+
+        HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+        HoodieFileSliceReader<? extends IndexedRecord> hoodieFileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
+            tableConfig.getPayloadClass(),
+            tableConfig.getPreCombineField(),
+            tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                tableConfig.getPartitionFieldProp())));
+
+        recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, Spliterator.NONNULL), false).map(hoodieRecord -> {
+          try {
+            return this.transform((IndexedRecord) hoodieRecord.getData().getInsertValue(readerSchema).get());
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to read next record", e);
+          }
+        }).iterator());
+      } catch (IOException e) {
+        throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+            + " and " + clusteringOp.getDeltaFilePaths(), e);
+      }
+    }
+
+    return new ConcatenatingIterator<>(recordIterators);
+  }
+
+  /**
+   * Read records from baseFiles and get iterator.
+   */
+  private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
+    List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
+      Iterable<IndexedRecord> indexedRecords = () -> {
+        try {
+          return HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
+        } catch (IOException e) {
+          throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+              + " and " + clusteringOp.getDeltaFilePaths(), e);
+        }
+      };
+
+      return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
+    }).collect(Collectors.toList());
+
+    return new ConcatenatingIterator<>(iteratorsForPartition);
+  }
+
+  /**
+   * Transform IndexedRecord into HoodieRecord.
+   */
+  private RowData transform(IndexedRecord indexedRecord) {
+    GenericRecord record = buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema));
+    return (RowData) avroToRowDataConverter.convert(record);
+  }
+
+  private int[] getRequiredPositions() {
+    final List<String> fieldNames = readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    return schema.getFields().stream()
+        .map(field -> fieldNames.indexOf(field.name()))
+        .mapToInt(i -> i)
+        .toArray();
+  }
+
+  private SortCodeGenerator createSortCodeGenerator() {
+    SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
+        conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
+    return sortOperatorGen.createSortCodeGenerator();
+  }
+
+  @Override
+  public void setKeyContextElement(StreamRecord<ClusteringPlanEvent> record) throws Exception {
+    OneInputStreamOperator.super.setKeyContextElement(record);
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
new file mode 100644
index 0000000000..c82075877b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Represents a cluster command from the clustering plan task {@link ClusteringPlanSourceFunction}.
+ */
+public class ClusteringPlanEvent implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private String clusteringInstantTime;
+
+  private ClusteringGroupInfo clusteringGroupInfo;
+
+  private Map<String, String> strategyParams;
+
+  public ClusteringPlanEvent() {
+  }
+
+  public ClusteringPlanEvent(
+      String instantTime,
+      ClusteringGroupInfo clusteringGroupInfo,
+      Map<String, String> strategyParams) {
+    this.clusteringInstantTime = instantTime;
+    this.clusteringGroupInfo = clusteringGroupInfo;
+    this.strategyParams = strategyParams;
+  }
+
+  public void setClusteringInstantTime(String clusteringInstantTime) {
+    this.clusteringInstantTime = clusteringInstantTime;
+  }
+
+  public void setClusteringGroupInfo(ClusteringGroupInfo clusteringGroupInfo) {
+    this.clusteringGroupInfo = clusteringGroupInfo;
+  }
+
+  public void setStrategyParams(Map<String, String> strategyParams) {
+    this.strategyParams = strategyParams;
+  }
+
+  public String getClusteringInstantTime() {
+    return clusteringInstantTime;
+  }
+
+  public ClusteringGroupInfo getClusteringGroupInfo() {
+    return clusteringGroupInfo;
+  }
+
+  public Map<String, String> getStrategyParams() {
+    return strategyParams;
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
new file mode 100644
index 0000000000..a3db2d41c8
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering source function.
+ *
+ * <P>This function read the clustering plan as {@link ClusteringOperation}s then assign the clustering task
+ * event {@link ClusteringPlanEvent} to downstream operators.
+ *
+ * <p>The clustering instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ *   <li>If the timeline has no inflight instants,
+ *   use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ *   as the instant time;</li>
+ *   <li>If the timeline has inflight instants,
+ *   use the median instant time between [last complete instant time, earliest inflight instant time]
+ *   as the instant time.</li>
+ * </ul>
+ */
+public class ClusteringPlanSourceFunction extends AbstractRichFunction implements SourceFunction<ClusteringPlanEvent> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
+
+  /**
+   * The clustering plan.
+   */
+  private final HoodieClusteringPlan clusteringPlan;
+
+  /**
+   * Hoodie instant.
+   */
+  private final HoodieInstant instant;
+
+  public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+    this.instant = instant;
+    this.clusteringPlan = clusteringPlan;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
+    for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
+      LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
+      sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void cancel() {
+    // no operation
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
new file mode 100644
index 0000000000..e87a7d6752
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink clustering.
+ */
+public class FlinkClusteringConfig extends Configuration {
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  // ------------------------------------------------------------------------
+  //  Hudi Write Options
+  // ------------------------------------------------------------------------
+
+  @Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true)
+  public String path;
+
+  // ------------------------------------------------------------------------
+  //  Clustering Options
+  // ------------------------------------------------------------------------
+  @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false)
+  public Integer clusteringDeltaCommits = 1;
+
+  @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false)
+  public Integer clusteringTasks = -1;
+
+  @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+  public Integer compactionMaxMemory = 100;
+
+  @Parameter(names = {"--clean-retain-commits"},
+      description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+          + "This also directly translates into how much you can incrementally pull on this table, default 10",
+      required = false)
+  public Integer cleanRetainCommits = 10;
+
+  @Parameter(names = {"--archive-min-commits"},
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
+      required = false)
+  public Integer archiveMinCommits = 20;
+
+  @Parameter(names = {"--archive-max-commits"},
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
+      required = false)
+  public Integer archiveMaxCommits = 30;
+
+  @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n"
+      + "There is a risk of losing data when scheduling clustering outside the writer job.\n"
+      + "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n"
+      + "Default is true", required = false)
+  public Boolean schedule = true;
+
+  @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
+  public Boolean cleanAsyncEnable = false;
+
+  @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
+  public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
+
+  @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
+  public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
+
+  @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
+  public Integer smallFileLimit = 600;
+
+  @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
+  public Integer skipFromLatestPartitions = 0;
+
+  @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false)
+  public String sortColumns = "";
+
+  @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false)
+  public Integer maxNumGroups = 30;
+
+  @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false)
+  public Integer targetPartitions = 2;
+
+  public static final String SEQ_FIFO = "FIFO";
+  public static final String SEQ_LIFO = "LIFO";
+  @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
+      + "1). FIFO: execute the oldest plan first;\n"
+      + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
+  public String clusteringSeq = SEQ_LIFO;
+
+  @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
+  public Boolean writePartitionUrlEncode = false;
+
+  @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+      + "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+      + "By default false (the names of partition folders are only partition values)")
+  public Boolean hiveStylePartitioning = false;
+
+  /**
+   * Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the properties
+   * in the properties file (set by `--props` option) and cmd line options
+   * (set by `--hoodie-conf` option).
+   */
+  public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
+    Configuration conf = new Configuration();
+
+    conf.setString(FlinkOptions.PATH, config.path);
+    conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
+    conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
+    conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
+    conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
+    conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+    conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
+    conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
+    conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+    conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
+    conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions);
+    conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
+
+    // use synchronous clustering always
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule);
+
+    // bulk insert conf
+    conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
+
+    return conf;
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
new file mode 100644
index 0000000000..f7c361533a
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering program that can be executed manually.
+ */
+public class HoodieFlinkClusteringJob {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set table name
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+    // set table type
+    conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    // judge whether have operation
+    // to compute the clustering instant time and do cluster.
+    if (cfg.schedule) {
+      String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+      boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+      if (!scheduled) {
+        // do nothing.
+        LOG.info("No clustering plan for this job ");
+        return;
+      }
+    }
+
+    table.getMetaClient().reloadActiveTimeline();
+
+    // fetch the instant based on the configured execution sequence
+    HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+    Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant();
+    if (!requested.isPresent()) {
+      // do nothing.
+      LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
+      return;
+    }
+
+    HoodieInstant clusteringInstant = requested.get();
+
+    HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
+    if (timeline.containsInstant(inflightInstant)) {
+      LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
+      writeClient.rollbackInflightClustering(inflightInstant, table);
+      table.getMetaClient().reloadActiveTimeline();
+    }
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), clusteringInstant);
+
+    if (!clusteringPlanOption.isPresent()) {
+      // do nothing.
+      LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
+      return;
+    }
+
+    HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+    if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
+        || (clusteringPlan.getInputGroups().isEmpty())) {
+      // No clustering plan, do nothing and return.
+      LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
+      return;
+    }
+
+    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+    HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
+    if (!pendingClusteringTimeline.containsInstant(instant)) {
+      // this means that the clustering plan was written to auxiliary path(.tmp)
+      // but not the meta path(.hoodie), this usually happens when the job crush
+      // exceptionally.
+
+      // clean the clustering plan in auxiliary path and cancels the clustering.
+
+      LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+          + "Clean the clustering plan in auxiliary path and cancels the clustering");
+      CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+      return;
+    }
+
+    // get clusteringParallelism.
+    int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
+        ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+
+    // Mark instant as clustering inflight
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+    final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    // setup configuration
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+  }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index f82712bca2..e9574dd52b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -192,7 +192,7 @@ public class FlinkStreamerConfig extends Configuration {
   public Boolean indexGlobalEnabled = true;
 
   @Parameter(names = {"--index-partition-regex"},
-      description = "Whether to load partitions in state if partition path matching, default *")
+      description = "Whether to load partitions in state if partition path matching, default *")
   public String indexPartitionRegex = ".*";
 
   @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b977dfd7c5..fcffbed54b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
@@ -162,6 +164,17 @@ public class StreamerUtil {
             .withPath(conf.getString(FlinkOptions.PATH))
             .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
             .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
+            .withClusteringConfig(
+                HoodieClusteringConfig.newBuilder()
+                    .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
+                    .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
+                    .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
+                    .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
+                    .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
+                    .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L)
+                    .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
+                    .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
+                    .build())
             .withCompactionConfig(
                 HoodieCompactionConfig.newBuilder()
                     .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
@@ -505,6 +518,11 @@ public class StreamerUtil {
    * Returns the max compaction memory in bytes with given conf.
    */
   public static long getMaxCompactionMemoryInBytes(Configuration conf) {
-    return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
+    return (long) conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
+  }
+
+  public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
+    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    return schemaUtil.getTableAvroSchema(includeMetadataFields);
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
new file mode 100644
index 0000000000..ac2ee0be37
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT cases for {@link HoodieFlinkClusteringJob}.
+ */
+public class ITTestHoodieFlinkClustering {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testHoodieFlinkClustering() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+    conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
+
+    // set record key field
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+    // set partition field
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+    long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the clustering instant time and do clustering.
+    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The clustering plan should be scheduled");
+
+    // fetch the instant based on the configured execution sequence
+    table.getMetaClient().reloadActiveTimeline();
+    HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+
+    // generate clustering plan
+    // should support configurable commit metadata
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), timeline.lastInstant().get());
+
+    HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+    // Mark instant as clustering inflight
+    HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+    final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+    final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+    DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+        .name("clustering_source")
+        .uid("uid_clustering_source")
+        .rebalance()
+        .transform("clustering_task",
+            TypeInformation.of(ClusteringCommitEvent.class),
+            new ClusteringOperator(conf, rowType))
+        .setParallelism(clusteringPlan.getInputGroups().size());
+
+    ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+    dataStream
+        .addSink(new ClusteringCommitSink(conf))
+        .name("clustering_commit")
+        .uid("uid_clustering_commit")
+        .setParallelism(1);
+
+    env.execute("flink_hudi_clustering");
+    TestData.checkWrittenData(tempFile, EXPECTED, 4);
+  }
+}