You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/08 06:46:10 UTC
[hudi] branch master updated: [HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a8312a9b8c3 [HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)
a8312a9b8c3 is described below
commit a8312a9b8c39f4baabf753974fa092c4767abb72
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Mar 8 14:46:01 2023 +0800
[HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)
* distinguish the single writer enabling metadata table and multi-writer use cases for lock guard,
that give us the chance for single table optimization such as some restrictions of metadata table compaction/initialization can be loosen or eliminated
* fix the isLockProviderPropertySet flag within HoodieWriteConfig builder
---
.../org/apache/hudi/client/BaseHoodieClient.java | 3 ++-
.../apache/hudi/client/BaseHoodieWriteClient.java | 22 ++++++----------
.../DirectMarkerTransactionManager.java | 7 +++---
.../client/transaction/TransactionManager.java | 19 +++++++-------
.../org/apache/hudi/config/HoodieWriteConfig.java | 15 ++++++-----
.../action/commit/BaseCommitActionExecutor.java | 2 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 29 +++++++++++-----------
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
8 files changed, 47 insertions(+), 52 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index a5faaffe3d7..b8409c7d19b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -170,7 +170,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
*
* @param table A hoodie table instance created after transaction starts so that the latest commits and files are captured.
* @param metadata Current committing instant's metadata
- * @param pendingInflightAndRequestedInstants
+ * @param pendingInflightAndRequestedInstants Pending instants on the timeline
+ *
* @see {@link BaseHoodieWriteClient#preCommit}
* @see {@link BaseHoodieTableServiceClient#preCommit}
*/
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index a248e6bd9b4..594d32de939 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- this.lastCompletedTxnAndMetadata = txnManager.isOptimisticConcurrencyControlEnabled()
+ this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
@@ -1086,7 +1086,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param tableServiceType Type of table service to schedule
- * @return
+ *
+ * @return The given instant time option or empty if no table service plan is scheduled
*/
public Option<String> scheduleTableService(Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1098,20 +1099,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
*
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param tableServiceType Type of table service to schedule
- * @return
+ *
+ * @return The given instant time option or empty if no table service plan is scheduled
*/
- public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
- TableServiceType tableServiceType) {
- // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
- final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
- tableServiceType.getAction(), instantTime));
- try {
- this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service " + tableServiceType);
- return tableServiceClient.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
- } finally {
- this.txnManager.endTransaction(inflightInstant);
- }
+ public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
+ return tableServiceClient.scheduleTableService(instantTime, extraMetadata, tableServiceType);
}
public HoodieMetrics getMetrics() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index 84fac2db004..f27af9a2549 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -42,13 +42,12 @@ public class DirectMarkerTransactionManager extends TransactionManager {
private final String filePath;
public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
- super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)),
- config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+ super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.needsLockGuard());
this.filePath = partitionPath + "/" + fileId;
}
public void beginTransaction(String newTxnOwnerInstantTime) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
lockManager.lock();
@@ -58,7 +57,7 @@ public class DirectMarkerTransactionManager extends TransactionManager {
}
public void endTransaction(String currentTxnOwnerInstantTime) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index 7fddf8a944b..40744a2f5d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -37,23 +37,22 @@ public class TransactionManager implements Serializable {
protected static final Logger LOG = LogManager.getLogger(TransactionManager.class);
protected final LockManager lockManager;
- protected final boolean isOptimisticConcurrencyControlEnabled;
+ protected final boolean needsLockGuard;
protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
- this(new LockManager(config, fs),
- config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+ this(new LockManager(config, fs), config.needsLockGuard());
}
- protected TransactionManager(LockManager lockManager, boolean isOptimisticConcurrencyControlEnabled) {
+ protected TransactionManager(LockManager lockManager, boolean needsLockGuard) {
this.lockManager = lockManager;
- this.isOptimisticConcurrencyControlEnabled = isOptimisticConcurrencyControlEnabled;
+ this.needsLockGuard = needsLockGuard;
}
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction starting for " + newTxnOwnerInstant
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
lockManager.lock();
@@ -64,7 +63,7 @@ public class TransactionManager implements Serializable {
}
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
lockManager.unlock();
@@ -85,7 +84,7 @@ public class TransactionManager implements Serializable {
}
public void close() {
- if (isOptimisticConcurrencyControlEnabled) {
+ if (needsLockGuard) {
lockManager.close();
LOG.info("Transaction manager closed");
}
@@ -103,7 +102,7 @@ public class TransactionManager implements Serializable {
return currentTxnOwnerInstant;
}
- public boolean isOptimisticConcurrencyControlEnabled() {
- return isOptimisticConcurrencyControlEnabled;
+ public boolean isNeedsLockGuard() {
+ return needsLockGuard;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 886112cae16..f06f0a9cfc5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2405,6 +2405,13 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE);
}
+ /**
+ * Returns whether the explicit guard of lock is required.
+ */
+ public boolean needsLockGuard() {
+ return isMetadataTableEnabled() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ }
+
/**
* Layout configs.
*/
@@ -2946,8 +2953,7 @@ public class HoodieWriteConfig extends HoodieConfig {
// isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig
final TypedProperties writeConfigProperties = writeConfig.getProps();
- final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
- || writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
+ final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
writeConfig.setDefaultOnCondition(!isLockConfigSet,
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
@@ -2971,13 +2977,10 @@ public class HoodieWriteConfig extends HoodieConfig {
// This is targeted at Single writer with async table services
// If user does not set the lock provider, likely that the concurrency mode is not set either
// Override the configs for metadata table
- writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName());
- LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
+ LOG.info(String.format("Automatically set %s=%s since user has not set the "
+ "lock provider for single writer with async table services",
- WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 8f3a0244d2e..0ab7efec82d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -92,7 +92,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
- if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isOptimisticConcurrencyControlEnabled()) {
+ if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isNeedsLockGuard()) {
// these txn metadata are only needed for auto commit when optimistic concurrent control is also enabled
this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index e7afa50a59d..1f4d6b18587 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -140,8 +140,10 @@ public class TestHoodieWriteConfig {
put(ASYNC_CLEAN.key(), "false");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
- }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ }), true, true, true,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+ HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
// 2. Async clean
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
@@ -153,8 +155,10 @@ public class TestHoodieWriteConfig {
put(ASYNC_CLEAN.key(), "true");
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
- }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ }), true, true, true,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+ HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
// 3. Async compaction configured
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
@@ -168,12 +172,8 @@ public class TestHoodieWriteConfig {
}
}), true,
tableType == HoodieTableType.MERGE_ON_READ, true,
- tableType == HoodieTableType.MERGE_ON_READ
- ? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
- : WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
- tableType == HoodieTableType.MERGE_ON_READ
- ? HoodieFailedWritesCleaningPolicy.LAZY
- : HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+ HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
tableType == HoodieTableType.MERGE_ON_READ
? inProcessLockProviderClassName
: HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
@@ -205,8 +205,9 @@ public class TestHoodieWriteConfig {
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
}
}), true, true, false,
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+ HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+ inProcessLockProviderClassName);
}
@ParameterizedTest
@@ -289,8 +290,8 @@ public class TestHoodieWriteConfig {
if (writeConfig.areAnyTableServicesAsync()) {
verifyConcurrencyControlRelatedConfigs(writeConfig,
true, true, true,
- WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
- HoodieFailedWritesCleaningPolicy.LAZY,
+ WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+ HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
InProcessLockProvider.class.getName());
} else {
verifyConcurrencyControlRelatedConfigs(writeConfig,
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3d4f6a3873b..1d4697d709d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends
* should be called before the Driver starts a new transaction.
*/
public void preTxn(HoodieTableMetaClient metaClient) {
- if (txnManager.isOptimisticConcurrencyControlEnabled()) {
+ if (txnManager.isNeedsLockGuard()) {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);