You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/05/24 16:47:36 UTC
[hudi] branch master updated: [HUDI-2207] Support independent flink hudi clustering function
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c20db99a7b [HUDI-2207] Support independent flink hudi clustering function
new 18635b533e Merge pull request #3599 from yuzhaojing/HUDI-2207
c20db99a7b is described below
commit c20db99a7b79b9545e6fae0d762250ceb55a8b79
Author: 喻兆靖 <yu...@bytedance.com>
AuthorDate: Sat May 21 21:25:15 2022 +0800
[HUDI-2207] Support independent flink hudi clustering function
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../apache/hudi/config/HoodieClusteringConfig.java | 2 +
.../apache/hudi/table/BulkInsertPartitioner.java | 16 +-
.../cluster/strategy/ClusteringPlanStrategy.java | 11 +
.../apache/hudi/client/HoodieFlinkWriteClient.java | 68 +++++
.../FlinkRecentDaysClusteringPlanStrategy.java | 65 +++++
...nkSelectedPartitionsClusteringPlanStrategy.java | 67 +++++
.../FlinkSizeBasedClusteringPlanStrategy.java | 129 +++++++++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 3 +-
.../JavaCustomColumnsSortPartitioner.java | 2 +-
.../bulkinsert/JavaGlobalSortPartitioner.java | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 67 +++++
.../hudi/sink/bulk/sort/SortOperatorGen.java | 2 +-
.../sink/clustering/ClusteringCommitEvent.java | 77 +++++
.../hudi/sink/clustering/ClusteringCommitSink.java | 174 +++++++++++
.../hudi/sink/clustering/ClusteringOperator.java | 318 +++++++++++++++++++++
.../hudi/sink/clustering/ClusteringPlanEvent.java | 73 +++++
.../clustering/ClusteringPlanSourceFunction.java | 91 ++++++
.../sink/clustering/FlinkClusteringConfig.java | 148 ++++++++++
.../sink/clustering/HoodieFlinkClusteringJob.java | 191 +++++++++++++
.../apache/hudi/streamer/FlinkStreamerConfig.java | 2 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 20 +-
.../sink/cluster/ITTestHoodieFlinkClustering.java | 184 ++++++++++++
23 files changed, 1700 insertions(+), 14 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 270027df18..251ff97799 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1379,7 +1379,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return scheduleClustering(extraMetadata);
}
- protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
+ public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index eee6f4f492..1180845a6e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
+ public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+ "org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 63b502531a..89360c2474 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -25,20 +25,20 @@ import org.apache.hudi.io.WriteHandleFactory;
import java.io.Serializable;
/**
- * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
- * Output spark partition will have records from only one hoodie partition. - Average records per output spark
- * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
+ * Repartition input records into at least expected number of output partitions. It should give below guarantees -
+ * Output partition will have records from only one hoodie partition. - Average records per output
+ * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
*/
public interface BulkInsertPartitioner<I> extends Serializable {
/**
- * Repartitions the input records into at least expected number of output spark partitions.
+ * Repartitions the input records into at least expected number of output partitions.
*
- * @param records Input Hoodie records
- * @param outputSparkPartitions Expected number of output partitions
+ * @param records Input Hoodie records
+ * @param outputPartitions Expected number of output partitions
* @return
*/
- I repartitionRecords(I records, int outputSparkPartitions);
+ I repartitionRecords(I records, int outputPartitions);
/**
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
@@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
+ *
* @param partitionId data partition
* @return
*/
@@ -57,6 +58,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Return write handle factory for the given partition.
+ *
* @param partitionId data partition
* @return
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 479f63932c..a96ff73947 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -70,6 +70,9 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+ String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+ String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
+ String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
@@ -82,6 +85,14 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return sparkSizeBasedClassName;
+ } else if (flinkRecentDaysClassName.equals(className)) {
+ config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+ LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
+ return flinkSizeBasedClassName;
+ } else if (flinkSelectedPartitionsClassName.equals(className)) {
+ config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+ LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+ return flinkSizeBasedClassName;
} else if (javaSelectedPartitionClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 49fa2ec246..ddfbabaf36 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
@@ -68,6 +71,8 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Iterator;
@@ -399,6 +404,52 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Clustering is not supported yet");
}
+ private void completeClustering(
+ HoodieReplaceCommitMetadata metadata,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ String clusteringCommitTime) {
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
+ HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+ List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
+ e.getValue().stream()).collect(Collectors.toList());
+ if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
+ throw new HoodieClusteringException("Clustering failed to write to files:"
+ + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
+ }
+
+ try {
+ this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
+ finalizeWrite(table, clusteringCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+ writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
+ LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
+ table.getActiveTimeline().transitionReplaceInflightToComplete(
+ HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ throw new HoodieClusteringException(
+ "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
+ } finally {
+ this.txnManager.endTransaction(Option.of(clusteringInstant));
+ }
+
+ WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (clusteringTimer != null) {
+ long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
+ try {
+ metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ + config.getBasePath() + " at time " + clusteringCommitTime, e);
+ }
+ }
+ LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ }
+
@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
@@ -412,6 +463,23 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// no need to execute the upgrade/downgrade on each write in streaming.
}
+ public void completeTableService(
+ TableServiceType tableServiceType,
+ HoodieCommitMetadata metadata,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ String commitInstant) {
+ switch (tableServiceType) {
+ case CLUSTER:
+ completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
+ break;
+ case COMPACT:
+ completeCompaction(metadata, table, commitInstant);
+ break;
+ default:
+ throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
+ }
+ }
+
/**
* Upgrade downgrade the Hoodie table.
*
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
new file mode 100644
index 0000000000..0109aaa60f
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Only looks at latest 'daybased.lookback.partitions' partitions.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
+ */
+public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+ extends FlinkSizeBasedClusteringPlanStrategy<T> {
+ private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
+
+ public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+ int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
+ int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
+ return partitionPaths.stream()
+ .sorted(Comparator.reverseOrder())
+ .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+ .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
+ .collect(Collectors.toList());
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
new file mode 100644
index 0000000000..ae5726bb4a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
+
+/**
+ * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
+ */
+public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+ extends FlinkSizeBasedClusteringPlanStrategy<T> {
+ private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);
+
+ public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
+ public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
+
+ public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+ String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
+ String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
+ List<String> filteredPartitions = partitionPaths.stream()
+ .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
+ .collect(Collectors.toList());
+ LOG.info("Filtered to the following partitions: " + filteredPartitions);
+ return filteredPartitions;
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 0000000000..8347da6014
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
+import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Creates clustering groups based on max size allowed per group.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
+ */
+public class FlinkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+ extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+ private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
+
+ public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
+ HoodieFlinkEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
+ HoodieWriteConfig writeConfig = getWriteConfig();
+
+ List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+ List<FileSlice> currentGroup = new ArrayList<>();
+ long totalSizeSoFar = 0;
+
+ for (FileSlice currentSlice : fileSlices) {
+ // check if max size is reached and create new group, if needed.
+ // in now, every clustering group out put is 1 file group.
+ if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && !currentGroup.isEmpty()) {
+ LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size());
+ fileSliceGroups.add(Pair.of(currentGroup, 1));
+ currentGroup = new ArrayList<>();
+ totalSizeSoFar = 0;
+ }
+
+ // Add to the current file-group
+ currentGroup.add(currentSlice);
+ // assume each file group size is ~= parquet.max.file.size
+ totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
+ }
+
+ if (!currentGroup.isEmpty()) {
+ fileSliceGroups.add(Pair.of(currentGroup, 1));
+ }
+
+ return fileSliceGroups.stream().map(fileSliceGroup ->
+ HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+ .setNumOutputFileGroups(fileSliceGroup.getRight())
+ .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+ .build());
+ }
+
+ @Override
+ protected Map<String, String> getStrategyParams() {
+ Map<String, String> params = new HashMap<>();
+ if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+ params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns());
+ }
+ return params;
+ }
+
+ @Override
+ protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+ return partitionPaths;
+ }
+
+ @Override
+ protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
+ return super.getFileSlicesEligibleForClustering(partition)
+ // Only files that have basefile size smaller than small file size are eligible.
+ .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
+ }
+
+ private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
+ return (int) Math.ceil(groupSize / (double) targetFileSize);
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9ab633f9e3..0e5f1c26e3 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -55,6 +55,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
+import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -286,7 +287,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
@Override
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
- throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table");
+ return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
}
@Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index eb3d4ef312..b9e466485f 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -49,7 +49,7 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
@Override
public List<HoodieRecord<T>> repartitionRecords(
- List<HoodieRecord<T>> records, int outputSparkPartitions) {
+ List<HoodieRecord<T>> records, int outputPartitions) {
return records.stream().sorted((o1, o2) -> {
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
index fded0ffab5..d272849a19 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java
@@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
@Override
public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
- int outputSparkPartitions) {
+ int outputPartitions) {
// Now, sort the records and line them up nicely for loading.
records.sort(new Comparator() {
@Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 729f0147b5..3de4bd4f75 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -18,6 +18,7 @@
package org.apache.hudi.configuration;
+import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
@@ -583,6 +584,72 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(40)// default min 40 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40");
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = ConfigOptions
+ .key("clustering.schedule.enabled")
+ .booleanType()
+ .defaultValue(false) // default false for pipeline
+ .withDescription("Schedule the cluster plan, default false");
+
+ public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
+ .key("clustering.delta_commits")
+ .intType()
+ .defaultValue(4)
+ .withDescription("Max delta commits needed to trigger clustering, default 4 commits");
+
+ public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
+ .key("clustering.tasks")
+ .intType()
+ .defaultValue(4)
+ .withDescription("Parallelism of tasks that do actual clustering, default is 4");
+
+ public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
+ .key("clustering.plan.strategy.daybased.lookback.partitions")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Number of partitions to list to create ClusteringPlan, default is 2");
+
+ public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
+ .key("clustering.plan.strategy.class")
+ .stringType()
+ .defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
+ .withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
+
+ public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+ .key("clustering.plan.strategy.target.file.max.bytes")
+ .intType()
+ .defaultValue(1024 * 1024 * 1024) // default 1 GB
+ .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
+
+ public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+ .key("clustering.plan.strategy.small.file.limit")
+ .intType()
+ .defaultValue(600) // default 600 MB
+ .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
+
+ public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
+ .key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
+ .intType()
+ .defaultValue(0)
+ .withDescription("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
+
+ public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
+ .key("clustering.plan.strategy.sort.columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Columns to sort the data by when clustering");
+
+ public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions
+ .key("clustering.plan.strategy.max.num.groups")
+ .intType()
+ .defaultValue(30)
+ .withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30");
+
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
index 4d3fc08efe..b5599886a9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java
@@ -48,7 +48,7 @@ public class SortOperatorGen {
codeGen.generateRecordComparator("SortComparator"));
}
- private SortCodeGenerator createSortCodeGenerator() {
+ public SortCodeGenerator createSortCodeGenerator() {
SortSpec.SortSpecBuilder builder = SortSpec.builder();
IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
return new SortCodeGenerator(tableConfig, rowType, builder.build());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
new file mode 100644
index 0000000000..30a8fbed3f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Represents a commit event from the clustering task {@link ClusteringFunction}.
+ */
+public class ClusteringCommitEvent implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The clustering commit instant time.
+ */
+ private String instant;
+ /**
+ * The write statuses.
+ */
+ private List<WriteStatus> writeStatuses;
+ /**
+ * The clustering task identifier.
+ */
+ private int taskID;
+
+ public ClusteringCommitEvent() {
+ }
+
+ public ClusteringCommitEvent(String instant, List<WriteStatus> writeStatuses, int taskID) {
+ this.instant = instant;
+ this.writeStatuses = writeStatuses;
+ this.taskID = taskID;
+ }
+
+ public void setInstant(String instant) {
+ this.instant = instant;
+ }
+
+ public void setWriteStatuses(List<WriteStatus> writeStatuses) {
+ this.writeStatuses = writeStatuses;
+ }
+
+ public void setTaskID(int taskID) {
+ this.taskID = taskID;
+ }
+
+ public String getInstant() {
+ return instant;
+ }
+
+ public List<WriteStatus> getWriteStatuses() {
+ return writeStatuses;
+ }
+
+ public int getTaskID() {
+ return taskID;
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
new file mode 100644
index 0000000000..bc87270a49
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Function to check and commit the clustering action.
+ *
+ * <p> Each time after receiving a clustering commit event {@link ClusteringCommitEvent},
+ * it loads and checks the clustering plan {@link org.apache.hudi.avro.model.HoodieClusteringPlan},
+ * if all the clustering operations {@link org.apache.hudi.common.model.ClusteringOperation}
+ * of the plan are finished, tries to commit the clustering action.
+ *
+ * <p>It also inherits the {@link CleanFunction} cleaning ability. This is needed because
+ * the SQL API does not allow multiple sinks in one table sink provider.
+ */
+public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ private transient HoodieFlinkTable<?> table;
+
+ /**
+ * Buffer to collect the event from each clustering task {@code ClusteringFunction}.
+ * The key is the instant time.
+ */
+ private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;
+
+ public ClusteringCommitSink(Configuration conf) {
+ super(conf);
+ this.conf = conf;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (writeClient == null) {
+ this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
+ }
+ this.commitBuffer = new HashMap<>();
+ this.table = writeClient.getHoodieTable();
+ }
+
+ @Override
+ public void invoke(ClusteringCommitEvent event, Context context) throws Exception {
+ final String instant = event.getInstant();
+ commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
+ .add(event);
+ commitIfNecessary(instant, commitBuffer.get(instant));
+ }
+
+ /**
+ * Condition to commit: the commit buffer has equal size with the clustering plan operations
+ * and all the clustering commit event {@link ClusteringCommitEvent} has the same clustering instant time.
+ *
+ * @param instant Clustering commit instant time
+ * @param events Commit events ever received for the instant
+ */
+ private void commitIfNecessary(String instant, List<ClusteringCommitEvent> events) {
+ HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(instant);
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+ StreamerUtil.createMetaClient(this.conf), clusteringInstant);
+ HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+ boolean isReady = clusteringPlan.getInputGroups().size() == events.size();
+ if (!isReady) {
+ return;
+ }
+ List<WriteStatus> statuses = events.stream()
+ .map(ClusteringCommitEvent::getWriteStatuses)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
+ writeMetadata.setWriteStatuses(statuses);
+ writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+ writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
+ validateWriteResult(clusteringPlan, instant, writeMetadata);
+ if (!writeMetadata.getCommitMetadata().isPresent()) {
+ HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
+ writeMetadata.getWriteStats().get(),
+ writeMetadata.getPartitionToReplaceFileIds(),
+ Option.empty(),
+ WriteOperationType.CLUSTER,
+ this.writeClient.getConfig().getSchema(),
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+ writeMetadata.setCommitMetadata(Option.of(commitMetadata));
+ }
+ // commit the clustering
+ this.table.getMetaClient().reloadActiveTimeline();
+ this.writeClient.completeTableService(
+ TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);
+
+ // reset the status
+ reset(instant);
+ }
+
+ private void reset(String instant) {
+ this.commitBuffer.remove(instant);
+ }
+
+ /**
+ * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
+ * But we can extend this to add more validation. E.g. number of records read = number of records written etc.
+ * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
+ */
+ private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, String instantTime, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+ if (writeMetadata.getWriteStatuses().isEmpty()) {
+ throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ + " write statuses");
+ }
+ }
+
+ private static Map<String, List<String>> getPartitionToReplacedFileIds(
+ HoodieClusteringPlan clusteringPlan,
+ HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
+ Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
+ .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
+ return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+ .filter(fg -> !newFilesWritten.contains(fg))
+ .collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
new file mode 100644
index 0000000000..a415ac9d46
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * Operator to execute the actual clustering task assigned by the clustering plan task.
+ * In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
+ */
+public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent> implements
+ OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
+
+ private final Configuration conf;
+ private final RowType rowType;
+ private int taskID;
+ private transient HoodieWriteConfig writeConfig;
+ private transient HoodieFlinkTable<?> table;
+ private transient Schema schema;
+ private transient Schema readerSchema;
+ private transient int[] requiredPos;
+ private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
+ private transient HoodieFlinkWriteClient writeClient;
+ private transient BulkInsertWriterHelper writerHelper;
+ private transient String instantTime;
+
+ private transient BinaryExternalSorter sorter;
+ private transient StreamRecordCollector<ClusteringCommitEvent> collector;
+ private transient BinaryRowDataSerializer binarySerializer;
+
+ public ClusteringOperator(Configuration conf, RowType rowType) {
+ this.conf = conf;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+ this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
+ this.table = writeClient.getHoodieTable();
+
+ this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+ this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
+ this.requiredPos = getRequiredPositions();
+
+ this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
+
+ ClassLoader cl = getContainingTask().getUserCodeClassLoader();
+
+ AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
+ this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
+
+ NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
+ RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
+
+ MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
+ this.sorter =
+ new BinaryExternalSorter(
+ this.getContainingTask(),
+ memManager,
+ computeMemorySize(),
+ this.getContainingTask().getEnvironment().getIOManager(),
+ inputSerializer,
+ binarySerializer,
+ computer,
+ comparator,
+ getContainingTask().getJobConfiguration());
+ this.sorter.startThreads();
+
+ collector = new StreamRecordCollector<>(output);
+
+ // register the metrics.
+ getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
+ getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
+ getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
+ }
+
+ @Override
+ public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
+ ClusteringPlanEvent event = element.getValue();
+ final String instantTime = event.getClusteringInstantTime();
+ final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
+
+ initWriterHelper(instantTime);
+
+ List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
+ boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
+
+ Iterator<RowData> iterator;
+ if (hasLogFiles) {
+ // if there are log files, we read all records into memory for a file group and apply updates.
+ iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
+ } else {
+ // We want to optimize reading records for case there are no log files.
+ iterator = readRecordsForGroupBaseFiles(clusteringOps);
+ }
+
+ RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
+ this.sorter.write(binaryRowData);
+ }
+
+ BinaryRowData row = binarySerializer.createInstance();
+ while ((row = sorter.getIterator().next(row)) != null) {
+ this.writerHelper.write(row);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.cleanHandlesGracefully();
+ this.writeClient.close();
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
+ collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void initWriterHelper(String clusteringInstantTime) {
+ if (this.writerHelper == null) {
+ this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
+ clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ this.instantTime = clusteringInstantTime;
+ }
+ }
+
+ /**
+ * Read records from baseFiles, apply updates and convert to Iterator.
+ */
+ @SuppressWarnings("unchecked")
+ private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
+ List<Iterator<RowData>> recordIterators = new ArrayList<>();
+
+ long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), writeConfig);
+ LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
+
+ for (ClusteringOperation clusteringOp : clusteringOps) {
+ try {
+ Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+ ? Option.empty()
+ : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
+ HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(table.getMetaClient().getFs())
+ .withBasePath(table.getMetaClient().getBasePath())
+ .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(instantTime)
+ .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+ .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
+ .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
+ .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+ .build();
+
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ HoodieFileSliceReader<? extends IndexedRecord> hoodieFileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
+ tableConfig.getPayloadClass(),
+ tableConfig.getPreCombineField(),
+ tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp())));
+
+ recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, Spliterator.NONNULL), false).map(hoodieRecord -> {
+ try {
+ return this.transform((IndexedRecord) hoodieRecord.getData().getInsertValue(readerSchema).get());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read next record", e);
+ }
+ }).iterator());
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ }
+
+ return new ConcatenatingIterator<>(recordIterators);
+ }
+
+ /**
+ * Read records from baseFiles and get iterator.
+ */
+ private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
+ List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
+ Iterable<IndexedRecord> indexedRecords = () -> {
+ try {
+ return HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ };
+
+ return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
+ }).collect(Collectors.toList());
+
+ return new ConcatenatingIterator<>(iteratorsForPartition);
+ }
+
+ /**
+ * Transform IndexedRecord into HoodieRecord.
+ */
+ private RowData transform(IndexedRecord indexedRecord) {
+ GenericRecord record = buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema));
+ return (RowData) avroToRowDataConverter.convert(record);
+ }
+
+ private int[] getRequiredPositions() {
+ final List<String> fieldNames = readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ return schema.getFields().stream()
+ .map(field -> fieldNames.indexOf(field.name()))
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ private SortCodeGenerator createSortCodeGenerator() {
+ SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
+ conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
+ return sortOperatorGen.createSortCodeGenerator();
+ }
+
+ @Override
+ public void setKeyContextElement(StreamRecord<ClusteringPlanEvent> record) throws Exception {
+ OneInputStreamOperator.super.setKeyContextElement(record);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
new file mode 100644
index 0000000000..c82075877b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanEvent.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Represents a cluster command from the clustering plan task {@link ClusteringPlanSourceFunction}.
+ */
+public class ClusteringPlanEvent implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String clusteringInstantTime;
+
+ private ClusteringGroupInfo clusteringGroupInfo;
+
+ private Map<String, String> strategyParams;
+
+ public ClusteringPlanEvent() {
+ }
+
+ public ClusteringPlanEvent(
+ String instantTime,
+ ClusteringGroupInfo clusteringGroupInfo,
+ Map<String, String> strategyParams) {
+ this.clusteringInstantTime = instantTime;
+ this.clusteringGroupInfo = clusteringGroupInfo;
+ this.strategyParams = strategyParams;
+ }
+
+ public void setClusteringInstantTime(String clusteringInstantTime) {
+ this.clusteringInstantTime = clusteringInstantTime;
+ }
+
+ public void setClusteringGroupInfo(ClusteringGroupInfo clusteringGroupInfo) {
+ this.clusteringGroupInfo = clusteringGroupInfo;
+ }
+
+ public void setStrategyParams(Map<String, String> strategyParams) {
+ this.strategyParams = strategyParams;
+ }
+
+ public String getClusteringInstantTime() {
+ return clusteringInstantTime;
+ }
+
+ public ClusteringGroupInfo getClusteringGroupInfo() {
+ return clusteringGroupInfo;
+ }
+
+ public Map<String, String> getStrategyParams() {
+ return strategyParams;
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
new file mode 100644
index 0000000000..a3db2d41c8
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering source function.
+ *
+ * <P>This function read the clustering plan as {@link ClusteringOperation}s then assign the clustering task
+ * event {@link ClusteringPlanEvent} to downstream operators.
+ *
+ * <p>The clustering instant time is specified explicitly with strategies:
+ *
+ * <ul>
+ * <li>If the timeline has no inflight instants,
+ * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
+ * as the instant time;</li>
+ * <li>If the timeline has inflight instants,
+ * use the median instant time between [last complete instant time, earliest inflight instant time]
+ * as the instant time.</li>
+ * </ul>
+ */
+public class ClusteringPlanSourceFunction extends AbstractRichFunction implements SourceFunction<ClusteringPlanEvent> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
+
+ /**
+ * The clustering plan.
+ */
+ private final HoodieClusteringPlan clusteringPlan;
+
+ /**
+ * Hoodie instant.
+ */
+ private final HoodieInstant instant;
+
+ public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+ this.instant = instant;
+ this.clusteringPlan = clusteringPlan;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
+ for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
+ LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
+ sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no operation
+ }
+
+ @Override
+ public void cancel() {
+ // no operation
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
new file mode 100644
index 0000000000..e87a7d6752
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink clustering.
+ */
+public class FlinkClusteringConfig extends Configuration {
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ // ------------------------------------------------------------------------
+ // Hudi Write Options
+ // ------------------------------------------------------------------------
+
+ @Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true)
+ public String path;
+
+ // ------------------------------------------------------------------------
+ // Clustering Options
+ // ------------------------------------------------------------------------
+ @Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false)
+ public Integer clusteringDeltaCommits = 1;
+
+ @Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false)
+ public Integer clusteringTasks = -1;
+
+ @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
+ public Integer compactionMaxMemory = 100;
+
+ @Parameter(names = {"--clean-retain-commits"},
+ description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ + "This also directly translates into how much you can incrementally pull on this table, default 10",
+ required = false)
+ public Integer cleanRetainCommits = 10;
+
+ @Parameter(names = {"--archive-min-commits"},
+ description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
+ required = false)
+ public Integer archiveMinCommits = 20;
+
+ @Parameter(names = {"--archive-max-commits"},
+ description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
+ required = false)
+ public Integer archiveMaxCommits = 30;
+
+ @Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n"
+ + "There is a risk of losing data when scheduling clustering outside the writer job.\n"
+ + "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n"
+ + "Default is true", required = false)
+ public Boolean schedule = true;
+
+ @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
+ public Boolean cleanAsyncEnable = false;
+
+ @Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
+ public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
+
+ @Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
+ public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
+
+ @Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
+ public Integer smallFileLimit = 600;
+
+ @Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
+ public Integer skipFromLatestPartitions = 0;
+
+ @Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false)
+ public String sortColumns = "";
+
+ @Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false)
+ public Integer maxNumGroups = 30;
+
+ @Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false)
+ public Integer targetPartitions = 2;
+
+ public static final String SEQ_FIFO = "FIFO";
+ public static final String SEQ_LIFO = "LIFO";
+ @Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
+ + "1). FIFO: execute the oldest plan first;\n"
+ + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
+ public String clusteringSeq = SEQ_LIFO;
+
+ @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
+ public Boolean writePartitionUrlEncode = false;
+
+ @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+ + "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ + "By default false (the names of partition folders are only partition values)")
+ public Boolean hiveStylePartitioning = false;
+
+ /**
+ * Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}.
+ * The latter is more suitable for the table APIs. It reads all the properties
+ * in the properties file (set by `--props` option) and cmd line options
+ * (set by `--hoodie-conf` option).
+ */
+ public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
+ Configuration conf = new Configuration();
+
+ conf.setString(FlinkOptions.PATH, config.path);
+ conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
+ conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
+ conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
+ conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
+ conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
+ conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
+ conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
+ conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
+ conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
+ conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
+ conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
+ conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
+ conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions);
+ conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
+
+ // use synchronous clustering always
+ conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule);
+
+ // bulk insert conf
+ conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
+ conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
+
+ return conf;
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
new file mode 100644
index 0000000000..f7c361533a
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi clustering program that can be executed manually.
+ */
+public class HoodieFlinkClusteringJob {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set table name
+ conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+ // set table type
+ conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ // judge whether have operation
+ // to compute the clustering instant time and do cluster.
+ if (cfg.schedule) {
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+ if (!scheduled) {
+ // do nothing.
+ LOG.info("No clustering plan for this job ");
+ return;
+ }
+ }
+
+ table.getMetaClient().reloadActiveTimeline();
+
+ // fetch the instant based on the configured execution sequence
+ HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+ Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant();
+ if (!requested.isPresent()) {
+ // do nothing.
+ LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
+ return;
+ }
+
+ HoodieInstant clusteringInstant = requested.get();
+
+ HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
+ if (timeline.containsInstant(inflightInstant)) {
+ LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
+ writeClient.rollbackInflightClustering(inflightInstant, table);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), clusteringInstant);
+
+ if (!clusteringPlanOption.isPresent()) {
+ // do nothing.
+ LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
+ return;
+ }
+
+ HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+ if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
+ || (clusteringPlan.getInputGroups().isEmpty())) {
+ // No clustering plan, do nothing and return.
+ LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
+ return;
+ }
+
+ HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+ HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
+ if (!pendingClusteringTimeline.containsInstant(instant)) {
+ // this means that the clustering plan was written to auxiliary path(.tmp)
+ // but not the meta path(.hoodie), this usually happens when the job crush
+ // exceptionally.
+
+ // clean the clustering plan in auxiliary path and cancels the clustering.
+
+ LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ + "Clean the clustering plan in auxiliary path and cancels the clustering");
+ CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+ return;
+ }
+
+ // get clusteringParallelism.
+ int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
+ ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+
+ // Mark instant as clustering inflight
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+ final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+ final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+ // setup configuration
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+ .setParallelism(clusteringPlan.getInputGroups().size());
+
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+ dataStream
+ .addSink(new ClusteringCommitSink(conf))
+ .name("clustering_commit")
+ .uid("uid_clustering_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_clustering");
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index f82712bca2..e9574dd52b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -192,7 +192,7 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean indexGlobalEnabled = true;
@Parameter(names = {"--index-partition-regex"},
- description = "Whether to load partitions in state if partition path matching, default *")
+ description = "Whether to load partitions in state if partition path matching, default *")
public String indexPartitionRegex = ".*";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index b977dfd7c5..fcffbed54b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -162,6 +164,17 @@ public class StreamerUtil {
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
+ .withClusteringConfig(
+ HoodieClusteringConfig.newBuilder()
+ .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
+ .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
+ .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
+ .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
+ .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
+ .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L)
+ .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
+ .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
+ .build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
@@ -505,6 +518,11 @@ public class StreamerUtil {
* Returns the max compaction memory in bytes with given conf.
*/
public static long getMaxCompactionMemoryInBytes(Configuration conf) {
- return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
+ return (long) conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
+ }
+
+ public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
+ TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+ return schemaUtil.getTableAvroSchema(includeMetadataFields);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
new file mode 100644
index 0000000000..ac2ee0be37
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT cases for {@link HoodieFlinkClusteringJob}.
+ */
+public class ITTestHoodieFlinkClustering {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testHoodieFlinkClustering() throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ tableEnv.executeSql(hoodieTableDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+ conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the clustering instant time and do clustering.
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The clustering plan should be scheduled");
+
+ // fetch the instant based on the configured execution sequence
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), timeline.lastInstant().get());
+
+ HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+ // Mark instant as clustering inflight
+ HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+ final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+ final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
+
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+ .setParallelism(clusteringPlan.getInputGroups().size());
+
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+ dataStream
+ .addSink(new ClusteringCommitSink(conf))
+ .name("clustering_commit")
+ .uid("uid_clustering_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_clustering");
+ TestData.checkWrittenData(tempFile, EXPECTED, 4);
+ }
+}