You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/16 08:05:16 UTC
[hudi] branch master updated: [HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new da80b8c8d52 [HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)
da80b8c8d52 is described below
commit da80b8c8d529bb8ff549b89d87e841eb3c2462cc
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 16 16:05:04 2023 +0800
[HUDI-5673] Support multi writer for bucket index with guarded lock (#7860)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 8 +-
.../storage/HoodieConsistentBucketLayout.java | 1 +
.../table/storage/HoodieSimpleBucketLayout.java | 1 +
.../hudi/client/HoodieFlinkTableServiceClient.java | 8 +
.../apache/hudi/client/HoodieFlinkWriteClient.java | 44 ++---
.../apache/hudi/index/FlinkHoodieIndexFactory.java | 4 +-
.../apache/hudi/configuration/FlinkOptions.java | 8 +
.../apache/hudi/configuration/OptionsResolver.java | 37 ++++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 12 +-
.../hudi/sink/bootstrap/BootstrapOperator.java | 9 +-
.../sink/common/AbstractStreamWriteFunction.java | 2 +-
.../org/apache/hudi/sink/meta/CkpMetadata.java | 21 +-
.../org/apache/hudi/util/FlinkWriteClients.java | 22 ++-
.../java/org/apache/hudi/util/StreamerUtil.java | 15 ++
.../apache/hudi/util/ViewStorageProperties.java | 15 +-
.../apache/hudi/sink/ITTestDataStreamWrite.java | 2 -
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 80 +++++++-
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 10 +
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 11 ++
.../org/apache/hudi/sink/meta/TestCkpMetadata.java | 21 +-
.../utils/BucketStreamWriteFunctionWrapper.java | 212 +++++++++++++++++++++
.../org/apache/hudi/sink/utils/TestWriteBase.java | 68 +++++--
.../hudi/utils/TestViewStorageProperties.java | 10 +-
23 files changed, 521 insertions(+), 100 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 207c30509d8..366e6aa4c04 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -49,6 +49,7 @@ import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
+import static org.apache.hudi.index.HoodieIndex.IndexType.FLINK_STATE;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
@@ -73,7 +74,7 @@ public class HoodieIndexConfig extends HoodieConfig {
// Builder#getDefaultIndexType has already set it according to engine type
.noDefaultValue()
.withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
- SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
+ SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name(), FLINK_STATE.name())
.withDocumentation("Type of index to use. Default is SIMPLE on Spark engine, "
+ "and INMEMORY on Flink and Java engines. "
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
@@ -651,6 +652,11 @@ public class HoodieIndexConfig extends HoodieConfig {
return this;
}
+ public Builder withRecordKeyField(String keyField) {
+ hoodieIndexConfig.setValue(KeyGeneratorOptions.RECORDKEY_FIELD_NAME, keyField);
+ return this;
+ }
+
public HoodieIndexConfig build() {
hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType));
hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
index acea61177a5..b6135b4c5d1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
@@ -37,6 +37,7 @@ public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
WriteOperationType.UPSERT,
WriteOperationType.UPSERT_PREPPED,
WriteOperationType.INSERT_OVERWRITE,
+ WriteOperationType.INSERT_OVERWRITE_TABLE,
WriteOperationType.DELETE,
WriteOperationType.COMPACT,
WriteOperationType.DELETE_PARTITION,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
index be048a23b05..53550bc6225 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
@@ -37,6 +37,7 @@ public class HoodieSimpleBucketLayout extends HoodieStorageLayout {
WriteOperationType.UPSERT,
WriteOperationType.UPSERT_PREPPED,
WriteOperationType.INSERT_OVERWRITE,
+ WriteOperationType.INSERT_OVERWRITE_TABLE,
WriteOperationType.DELETE,
WriteOperationType.COMPACT,
WriteOperationType.DELETE_PARTITION
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 4b3eaaa1d42..faf27d2ce82 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -182,6 +182,14 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie
}
}
+ @Override
+ protected void preCommit(HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+ // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
+ }
+
/**
* Initialize the table metadata writer, for e.g, bootstrap the metadata table
* from the filesystem if it does not exist.
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7d0728b95eb..3d4f6a3873b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -44,7 +46,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.WriteStatMerger;
@@ -274,6 +275,19 @@ public class HoodieFlinkWriteClient<T> extends
// remove the async cleaning
}
+ /**
+ * Refresh the last transaction metadata,
+ * should be called before the Driver starts a new transaction.
+ */
+ public void preTxn(HoodieTableMetaClient metaClient) {
+ if (txnManager.isOptimisticConcurrencyControlEnabled()) {
+ // refresh the meta client which is reused
+ metaClient.reloadActiveTimeline();
+ this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
+ this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
+ }
+ }
+
@Override
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
tableServiceClient.writeTableMetadata(table, instantTime, actionType, metadata);
@@ -322,30 +336,12 @@ public class HoodieFlinkWriteClient<T> extends
return result.getWriteStatuses();
}
- /**
- * Post commit is rewrite to be invoked after a successful commit.
- *
- * <p>The Flink write client is designed to write data set as buckets
- * but cleaning action should trigger after all the write actions within a
- * checkpoint finish.
- *
- * @param table Table to commit on
- * @param metadata Commit Metadata corresponding to committed instant
- * @param instantTime Instant Time
- * @param extraMetadata Additional Metadata passed by user
- */
@Override
- protected void postCommit(HoodieTable table,
- HoodieCommitMetadata metadata,
- String instantTime,
- Option<Map<String, String>> extraMetadata) {
- try {
- // Delete the marker directory for the instant.
- WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- } finally {
- this.heartbeatClient.stop(instantTime);
- }
+ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
+ // Create a Hoodie table after startTxn which encapsulated the commits and files visible.
+ // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
+ HoodieTable table = createTable(config, hadoopConf);
+ resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
}
@Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index b10014b9183..13302293d46 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -46,8 +46,10 @@ public final class FlinkHoodieIndexFactory {
return (HoodieIndex) instance;
}
- // TODO more indexes to be added
switch (config.getIndexType()) {
+ case FLINK_STATE:
+ // Flink state index stores the index mappings with a state-backend,
+ // instantiates an in-memory HoodieIndex component as a placeholder.
case INMEMORY:
return new FlinkInMemoryStateIndex(context, config);
case BLOOM:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9cdeb963d53..f7203699683 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -197,6 +197,7 @@ public class FlinkOptions extends HoodieConfig {
.key("index.type")
.stringType()
.defaultValue(HoodieIndex.IndexType.FLINK_STATE.name())
+ .withFallbackKeys(HoodieIndexConfig.INDEX_TYPE.key())
.withDescription("Index type of Flink write job, default is using state backed index.");
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
@@ -562,6 +563,13 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(128)
.withDescription("Sort memory in MB, default 128MB");
+ // this is only for internal use
+ public static final ConfigOption<String> WRITE_CLIENT_ID = ConfigOptions
+ .key("write.client.id")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Unique identifier used to distinguish different writer pipelines for concurrent mode");
+
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 42b94b58351..51512244a71 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -18,11 +18,16 @@
package org.apache.hudi.configuration;
+import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
+import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;
@@ -227,6 +232,38 @@ public class OptionsResolver {
return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent();
}
+ /**
+ * Returns whether the writer txn should be guarded by lock.
+ */
+ public static boolean needsGuardByLock(Configuration conf) {
+ return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
+ || conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+ .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+ }
+
+ /**
+ * Returns the index type.
+ */
+ public static HoodieIndex.IndexType getIndexType(Configuration conf) {
+ return HoodieIndex.IndexType.valueOf(conf.getString(FlinkOptions.INDEX_TYPE));
+ }
+
+ /**
+ * Returns the index key field.
+ */
+ public static String getIndexKeyField(Configuration conf) {
+ return conf.getString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
+ }
+
+ /**
+ * Returns the conflict resolution strategy.
+ */
+ public static ConflictResolutionStrategy getConflictResolutionStrategy(Configuration conf) {
+ return isBucketIndexType(conf)
+ ? new BucketIndexConcurrentFileWritesConflictResolutionStrategy()
+ : new SimpleConcurrentFileWritesConflictResolutionStrategy();
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 738a0089ef9..9f31d6f59a7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -41,7 +41,6 @@ import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
-import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -180,7 +179,7 @@ public class StreamWriteOperatorCoordinator
this.gateways = new SubtaskGateway[this.parallelism];
// init table, create if not exists.
this.metaClient = initTableIfNotExists(this.conf);
- this.ckpMetadata = initCkpMetadata(this.metaClient);
+ this.ckpMetadata = initCkpMetadata(this.metaClient, this.conf);
// the write client must create after the table creation
this.writeClient = FlinkWriteClients.createWriteClient(conf);
initMetadataTable(this.writeClient);
@@ -342,8 +341,8 @@ public class StreamWriteOperatorCoordinator
writeClient.initMetadataTable();
}
- private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
- CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
+ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient, Configuration conf) throws IOException {
+ CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient, conf.getString(FlinkOptions.WRITE_CLIENT_ID));
ckpMetadata.bootstrap();
return ckpMetadata;
}
@@ -372,6 +371,8 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
+ // refresh the last txn metadata
+ this.writeClient.preTxn(this.metaClient);
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
@@ -391,8 +392,7 @@ public class StreamWriteOperatorCoordinator
* until it finds a new inflight instant on the timeline.
*/
private void initInstant(String instant) {
- HoodieTimeline completedTimeline =
- StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
+ HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
executor.execute(() -> {
if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || completedTimeline.containsInstant(instant)) {
// the last instant committed successfully
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 3eaa47e3b62..42ade8bec66 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -127,7 +127,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
- this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
+ this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient(), this.conf.getString(FlinkOptions.WRITE_CLIENT_ID));
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
preLoadIndexRecords();
@@ -229,17 +229,14 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
- writeConfig, hadoopConf);
- try {
+ try (HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
+ writeConfig, hadoopConf)) {
for (String recordKey : scanner.getRecords().keySet()) {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
- } finally {
- scanner.close();
}
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index fa4c3db86ea..23b580b86bc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -148,7 +148,7 @@ public abstract class AbstractStreamWriteFunction<I>
TypeInformation.of(WriteMetadataEvent.class)
));
- this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
+ this.ckpMetadata = CkpMetadata.getInstance(this.metaClient, this.config.getString(FlinkOptions.WRITE_CLIENT_ID));
this.currentInstant = lastPendingInstant();
if (context.isRestored()) {
restoreWriteMetadata();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index addabcdc4c8..39ae1a1ef32 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.meta;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -75,12 +76,13 @@ public class CkpMetadata implements Serializable {
private List<String> instantCache;
private CkpMetadata(Configuration config) {
- this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
+ this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)),
+ config.getString(FlinkOptions.PATH), config.getString(FlinkOptions.WRITE_CLIENT_ID));
}
- private CkpMetadata(FileSystem fs, String basePath) {
+ private CkpMetadata(FileSystem fs, String basePath, String uniqueId) {
this.fs = fs;
- this.path = new Path(ckpMetaPath(basePath));
+ this.path = new Path(ckpMetaPath(basePath, uniqueId));
}
public void close() {
@@ -208,12 +210,17 @@ public class CkpMetadata implements Serializable {
return new CkpMetadata(config);
}
- public static CkpMetadata getInstance(FileSystem fs, String basePath) {
- return new CkpMetadata(fs, basePath);
+ public static CkpMetadata getInstance(HoodieTableMetaClient metaClient, String uniqueId) {
+ return new CkpMetadata(metaClient.getFs(), metaClient.getBasePath(), uniqueId);
}
- protected static String ckpMetaPath(String basePath) {
- return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
+ public static CkpMetadata getInstance(FileSystem fs, String basePath, String uniqueId) {
+ return new CkpMetadata(fs, basePath, uniqueId);
+ }
+
+ protected static String ckpMetaPath(String basePath, String uniqueId) {
+ String metaPath = basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
+ return StringUtils.isNullOrEmpty(uniqueId) ? metaPath : metaPath + "_" + uniqueId;
}
private Path fullPath(String fileName) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 9f61580da0b..fa324952ed8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -216,6 +216,7 @@ public class FlinkWriteClients {
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
+ .withIndexConfig(StreamerUtil.getIndexConfig(conf))
.withPayloadConfig(getPayloadConfig(conf))
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
@@ -226,15 +227,18 @@ public class FlinkWriteClients {
if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
builder.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
- if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
- builder.withLockConfig(HoodieLockConfig.newBuilder()
- .withLockProvider(FileSystemBasedLockProvider.class)
- .withLockWaitTimeInMillis(2000L) // 2s
- .withFileSystemLockExpire(1) // 1 minute
- .withClientNumRetries(30)
- .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
- .build());
- }
+ }
+
+ if (!conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()) && OptionsResolver.needsGuardByLock(conf)) {
+ // configure the fs lock provider by default
+ builder.withLockConfig(HoodieLockConfig.newBuilder()
+ .withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))
+ .withLockProvider(FileSystemBasedLockProvider.class)
+ .withLockWaitTimeInMillis(2000L) // 2s
+ .withFileSystemLockExpire(1) // 1 minute
+ .withClientNumRetries(30)
+ .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
+ .build());
}
// do not configure cleaning strategy as LAZY until multi-writers is supported.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 1e5af896928..e8578ad82ba 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -138,6 +140,19 @@ public class StreamerUtil {
.build();
}
+ /**
+ * Returns the index config with given configuration.
+ */
+ public static HoodieIndexConfig getIndexConfig(Configuration conf) {
+ return HoodieIndexConfig.newBuilder()
+ .withIndexType(OptionsResolver.getIndexType(conf))
+ .withBucketNum(String.valueOf(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
+ .withRecordKeyField(conf.getString(FlinkOptions.RECORD_KEY_FIELD))
+ .withIndexKeyField(OptionsResolver.getIndexKeyField(conf))
+ .withEngineType(EngineType.FLINK)
+ .build();
+ }
+
/**
* Converts the give {@link Configuration} to {@link TypedProperties}.
* The default values are also set up.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
index a5e9f311456..0f3de9127b7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
@@ -20,6 +20,8 @@ package org.apache.hudi.util;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
@@ -43,7 +45,9 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER
public class ViewStorageProperties {
private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class);
- private static final String FILE_NAME = "view_storage_conf.properties";
+ private static final String FILE_NAME = "view_storage_conf";
+
+ private static final String FILE_SUFFIX = ".properties";
/**
* Initialize the {@link #FILE_NAME} meta file.
@@ -52,7 +56,7 @@ public class ViewStorageProperties {
String basePath,
FileSystemViewStorageConfig config,
Configuration flinkConf) throws IOException {
- Path propertyPath = getPropertiesFilePath(basePath);
+ Path propertyPath = getPropertiesFilePath(basePath, flinkConf.getString(FlinkOptions.WRITE_CLIENT_ID));
FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(flinkConf));
fs.delete(propertyPath, false);
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
@@ -65,7 +69,7 @@ public class ViewStorageProperties {
* Read the {@link FileSystemViewStorageConfig} with given table base path.
*/
public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration conf) {
- Path propertyPath = getPropertiesFilePath(basePath);
+ Path propertyPath = getPropertiesFilePath(basePath, conf.getString(FlinkOptions.WRITE_CLIENT_ID));
LOG.info("Loading filesystem view storage properties from " + propertyPath);
FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(conf));
Properties props = new Properties();
@@ -79,8 +83,9 @@ public class ViewStorageProperties {
}
}
- private static Path getPropertiesFilePath(String basePath) {
+ private static Path getPropertiesFilePath(String basePath, String uniqueId) {
String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
- return new Path(auxPath, FILE_NAME);
+ String fileName = StringUtils.isNullOrEmpty(uniqueId) ? FILE_NAME : FILE_NAME + "_" + uniqueId;
+ return new Path(auxPath, fileName);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index e5d2b315ca7..e9574e61d92 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -113,7 +113,6 @@ public class ITTestDataStreamWrite extends TestLogger {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
- conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
testWriteToHoodie(conf, "cow_write", 2, EXPECTED);
@@ -159,7 +158,6 @@ public class ITTestDataStreamWrite extends TestLogger {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
- conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 21dd6fd1d18..f705a63f1ff 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -21,9 +21,13 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.TestConfigurations;
@@ -239,7 +243,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
@Test
public void testInsertAppendMode() throws Exception {
- prepareInsertPipeline()
+ conf.setString(FlinkOptions.OPERATION, "insert");
+ preparePipeline()
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.checkpoint(1)
@@ -294,7 +299,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
- prepareInsertPipeline(conf)
+ preparePipeline()
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.checkpoint(1)
.handleEvents(1)
@@ -422,6 +427,69 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ // case1: txn2's time range is involved in txn1
+ // |----------- txn1 -----------|
+ // | ----- txn2 ----- |
+ @Test
+ public void testWriteMultiWriterInvolved() throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+ conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+ // now start pipeline2 and commit the txn
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ preparePipeline(conf2)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1);
+ // step to commit the 2nd txn, should throw exception
+ // for concurrent modification of same fileGroups
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpointCompleteThrows(1, HoodieWriteConflictException.class, "Cannot resolve conflicts");
+ }
+
+ // case2: txn2's time range has partial overlap with txn1
+ // |----------- txn1 -----------|
+ // | ----- txn2 ----- |
+ @Test
+ public void testWriteMultiWriterPartialOverlapping() throws Exception {
+ conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+ conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+ TestHarness pipeline1 = preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+ // now start pipeline2 and suspend the txn commit
+ Configuration conf2 = conf.clone();
+ conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+ TestHarness pipeline2 = preparePipeline(conf2)
+ .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+ .assertEmptyDataFiles();
+
+ // step to commit the 1st txn, should succeed
+ pipeline1.checkpoint(1)
+ .assertNextEvent()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED3, 1);
+
+ // step to commit the 2nd txn, should throw exception
+ // for concurrent modification of same fileGroups
+ pipeline2.checkpoint(1)
+ .assertNextEvent()
+ .checkpointCompleteThrows(1, HoodieWriteConflictException.class, "Cannot resolve conflicts");
+ }
+
@Test
public void testReuseEmbeddedServer() throws IOException {
conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
@@ -449,14 +517,6 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
return TestHarness.instance().preparePipeline(tempFile, conf);
}
- protected TestHarness prepareInsertPipeline() throws Exception {
- return prepareInsertPipeline(conf);
- }
-
- protected TestHarness prepareInsertPipeline(Configuration conf) throws Exception {
- return TestHarness.instance().preparePipeline(tempFile, conf, true);
- }
-
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index df01fc9076f..4683955d8e0 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -141,11 +141,21 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
.end();
}
+ @Test
+ public void testInsertAppendMode() {
+ // append mode is only valid for cow table.
+ }
+
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
}
+ @Test
+ public void testInsertAsyncClustering() {
+ // insert async clustering is only valid for cow table.
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 704d94caba3..f6ee44db254 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
@@ -37,11 +38,21 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
+ @Test
+ public void testInsertAppendMode() {
+ // append mode is only valid for cow table.
+ }
+
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
}
+ @Test
+ public void testInsertAsyncClustering() {
+ // insert async clustering is only valid for cow table.
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index fe7ce3f9478..b49441f3b9d 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -26,8 +26,9 @@ import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.stream.IntStream;
@@ -41,24 +42,20 @@ import static org.hamcrest.MatcherAssert.assertThat;
*/
public class TestCkpMetadata {
- private CkpMetadata metadata;
-
@TempDir
File tempFile;
@BeforeEach
public void beforeEach() throws Exception {
String basePath = tempFile.getAbsolutePath();
- FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(new Configuration()));
-
Configuration conf = TestConfigurations.getDefaultConf(basePath);
StreamerUtil.initTableIfNotExists(conf);
-
- this.metadata = CkpMetadata.getInstance(fs, basePath);
}
- @Test
- void testWriteAndReadMessage() {
+ @ParameterizedTest
+ @ValueSource(strings = {"", "1"})
+ void testWriteAndReadMessage(String uniqueId) {
+ CkpMetadata metadata = getCkpMetadata(uniqueId);
// write and read 5 committed checkpoints
IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + ""));
@@ -74,4 +71,10 @@ public class TestCkpMetadata {
metadata.abortInstant("7");
assertThat(metadata.getMessages().size(), is(5));
}
+
+ private CkpMetadata getCkpMetadata(String uniqueId) {
+ String basePath = tempFile.getAbsolutePath();
+ FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(new Configuration()));
+ return CkpMetadata.getInstance(fs, basePath, uniqueId);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
new file mode 100644
index 00000000000..49d6b4b1e2b
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.streaming.util.MockStreamTask;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link BucketStreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class BucketStreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
+ private final Configuration conf;
+
+ private final IOManager ioManager;
+ private final StreamingRuntimeContext runtimeContext;
+ private final MockOperatorEventGateway gateway;
+ private final MockOperatorCoordinatorContext coordinatorContext;
+ private final StreamWriteOperatorCoordinator coordinator;
+ private final MockStateInitializationContext stateInitializationContext;
+
+ /**
+ * Function that converts row data to HoodieRecord.
+ */
+ private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+
+ /**
+ * Stream write function.
+ */
+ private StreamWriteFunction<HoodieRecord<?>> writeFunction;
+
+ private CompactFunctionWrapper compactFunctionWrapper;
+
+ private final MockStreamTask streamTask;
+
+ private final StreamConfig streamConfig;
+
+ private final boolean asyncCompaction;
+
+ public BucketStreamWriteFunctionWrapper(String tablePath) throws Exception {
+ this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+ }
+
+ public BucketStreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception {
+ this.ioManager = new IOManagerAsync();
+ MockEnvironment environment = new MockEnvironmentBuilder()
+ .setTaskName("mockTask")
+ .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+ .setIOManager(ioManager)
+ .build();
+ this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+ this.gateway = new MockOperatorEventGateway();
+ this.conf = conf;
+ // one function
+ this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+ this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+ this.stateInitializationContext = new MockStateInitializationContext();
+ this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+ this.streamConfig = new StreamConfig(conf);
+ streamConfig.setOperatorID(new OperatorID());
+ this.streamTask = new MockStreamTaskBuilder(environment)
+ .setConfig(new StreamConfig(conf))
+ .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
+ .build();
+ this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, this.streamTask, this.streamConfig);
+ }
+
+ public void openFunction() throws Exception {
+ this.coordinator.start();
+ this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+ toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
+ toHoodieFunction.setRuntimeContext(runtimeContext);
+ toHoodieFunction.open(conf);
+
+ setupWriteFunction();
+
+ if (asyncCompaction) {
+ compactFunctionWrapper.openFunction();
+ }
+ }
+
+ public void invoke(I record) throws Exception {
+ HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
+ writeFunction.processElement(hoodieRecord, null, null);
+ }
+
+ public WriteMetadataEvent[] getEventBuffer() {
+ return this.coordinator.getEventBuffer();
+ }
+
+ public OperatorEvent getNextEvent() {
+ return this.gateway.getNextEvent();
+ }
+
+ public Map<String, List<HoodieRecord>> getDataBuffer() {
+ return this.writeFunction.getDataBuffer();
+ }
+
+ public void checkpointFunction(long checkpointId) throws Exception {
+ // checkpoint the coordinator first
+ this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+ writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+ stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+ }
+
+ public void endInput() {
+ writeFunction.endInput();
+ }
+
+ public void checkpointComplete(long checkpointId) {
+ stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+ coordinator.notifyCheckpointComplete(checkpointId);
+ if (asyncCompaction) {
+ try {
+ compactFunctionWrapper.compact(checkpointId);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+ }
+
+ public void checkpointFails(long checkpointId) {
+ coordinator.notifyCheckpointAborted(checkpointId);
+ }
+
+ public void subTaskFails(int taskID) throws Exception {
+ coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
+ setupWriteFunction();
+ }
+
+ public void close() throws Exception {
+ coordinator.close();
+ ioManager.close();
+ writeFunction.close();
+ if (compactFunctionWrapper != null) {
+ compactFunctionWrapper.close();
+ }
+ }
+
+ public StreamWriteOperatorCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ public MockOperatorCoordinatorContext getCoordinatorContext() {
+ return coordinatorContext;
+ }
+
+ public boolean isConforming() {
+ return this.writeFunction.isConfirming();
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void setupWriteFunction() throws Exception {
+ writeFunction = new BucketStreamWriteFunction<>(conf);
+ writeFunction.setRuntimeContext(runtimeContext);
+ writeFunction.setOperatorEventGateway(gateway);
+ writeFunction.initializeState(this.stateInitializationContext);
+ writeFunction.open(conf);
+
+ // handle the bootstrap event
+ coordinator.handleEventFromOperator(0, getNextEvent());
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b6ae0767d68..a6e7a19952c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
@@ -38,13 +39,16 @@ import org.hamcrest.MatcherAssert;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -121,23 +125,25 @@ public class TestWriteBase {
private Configuration conf;
private TestFunctionWrapper<RowData> pipeline;
+ private CkpMetadata ckpMetadata;
+
private String lastPending;
private String lastComplete;
public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
- preparePipeline(basePath, conf, false);
- return this;
- }
-
- public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception {
this.baseFile = basePath;
this.basePath = this.baseFile.getAbsolutePath();
this.conf = conf;
- this.pipeline = append
- ? new InsertFunctionWrapper<>(this.basePath, conf)
- : new StreamWriteFunctionWrapper<>(this.basePath, conf);
+ if (OptionsResolver.isAppendMode(conf)) {
+ this.pipeline = new InsertFunctionWrapper<>(this.basePath, conf);
+ } else if (OptionsResolver.isBucketIndexType(conf)) {
+ this.pipeline = new BucketStreamWriteFunctionWrapper<>(this.basePath, conf);
+ } else {
+ this.pipeline = new StreamWriteFunctionWrapper<>(this.basePath, conf);
+ }
// open the function and ingest data
this.pipeline.openFunction();
+ this.ckpMetadata = CkpMetadata.getInstance(conf);
return this;
}
@@ -258,10 +264,23 @@ public class TestWriteBase {
this.lastPending = lastPendingInstant();
this.pipeline.checkpointComplete(checkpointId);
// started a new instant already
- checkInflightInstant();
+ String newInflight = checkInflightInstant();
checkInstantState(HoodieInstant.State.COMPLETED, lastPending);
this.lastComplete = lastPending;
- this.lastPending = lastPendingInstant(); // refresh last pending instant
+ this.lastPending = newInflight; // refresh last pending instant
+ return this;
+ }
+
+ /**
+ * Asserts the checkpoint with id {@code checkpointId} throws when completes .
+ */
+ public TestHarness checkpointCompleteThrows(long checkpointId, Class<?> cause, String msg) {
+ this.pipeline.checkpointComplete(checkpointId);
+ assertTrue(this.pipeline.getCoordinatorContext().isJobFailed(), "Job should have been failed");
+ Throwable throwable = this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
+ assertThat(throwable, instanceOf(cause));
+ assertThat(throwable.getMessage(), containsString(msg));
+ // assertThrows(HoodieException.class, () -> , msg);
return this;
}
@@ -318,12 +337,28 @@ public class TestWriteBase {
* Asserts the data files are empty.
*/
public TestHarness assertEmptyDataFiles() {
- File[] dataFiles = baseFile.listFiles(file -> !file.getName().startsWith("."));
- assertNotNull(dataFiles);
- assertThat(dataFiles.length, is(0));
+ assertFalse(fileExists(), "No data files should have been created");
return this;
}
+ private boolean fileExists() {
+ List<File> dirsToCheck = new ArrayList<>();
+ dirsToCheck.add(baseFile);
+ while (!dirsToCheck.isEmpty()) {
+ File dir = dirsToCheck.remove(0);
+ for (File file : Objects.requireNonNull(dir.listFiles())) {
+ if (!file.getName().startsWith(".")) {
+ if (file.isDirectory()) {
+ dirsToCheck.add(file);
+ } else {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
checkWrittenData(expected, 4);
return this;
@@ -385,12 +420,13 @@ public class TestWriteBase {
}
private String lastPendingInstant() {
- return TestUtils.getLastPendingInstant(basePath);
+ return this.ckpMetadata.lastPendingInstant();
}
- private void checkInflightInstant() {
- final String instant = TestUtils.getLastPendingInstant(basePath);
+ private String checkInflightInstant() {
+ final String instant = this.ckpMetadata.lastPendingInstant();
assertNotNull(instant);
+ return instant;
}
private void checkInstantState(HoodieInstant.State state, String instantStr) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
index 084f211e660..a8b06c111cd 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java
@@ -27,6 +27,8 @@ import org.apache.hudi.util.ViewStorageProperties;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
@@ -41,19 +43,21 @@ public class TestViewStorageProperties {
@TempDir
File tempFile;
- @Test
- void testReadWriteProperties() throws IOException {
+ @ParameterizedTest
+ @ValueSource(strings = {"", "1"})
+ void testReadWriteProperties(String uniqueId) throws IOException {
String basePath = tempFile.getAbsolutePath();
FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
.withRemoteServerHost("host1")
.withRemoteServerPort(1234).build();
Configuration flinkConfig = new Configuration();
+ flinkConfig.setString(FlinkOptions.WRITE_CLIENT_ID, uniqueId);
ViewStorageProperties.createProperties(basePath, config, flinkConfig);
ViewStorageProperties.createProperties(basePath, config, flinkConfig);
ViewStorageProperties.createProperties(basePath, config, flinkConfig);
- FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, new Configuration());
+ FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath, flinkConfig);
assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
assertThat(readConfig.getRemoteViewServerPort(), is(1234));