You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/03/08 06:46:10 UTC

[hudi] branch master updated: [HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a8312a9b8c3 [HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)
a8312a9b8c3 is described below

commit a8312a9b8c39f4baabf753974fa092c4767abb72
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Mar 8 14:46:01 2023 +0800

    [HUDI-5887] Distinguish the single writer enabling metadata table and multi-writer use cases for lock guard (#8111)
    
    * distinguish the single writer enabling metadata table and multi-writer use cases for lock guard,
      that give us the chance for single table optimization such as some restrictions of metadata table compaction/initialization can be loosen or eliminated
    * fix the isLockProviderPropertySet flag within HoodieWriteConfig builder
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |  3 ++-
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 22 ++++++----------
 .../DirectMarkerTransactionManager.java            |  7 +++---
 .../client/transaction/TransactionManager.java     | 19 +++++++-------
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 15 ++++++-----
 .../action/commit/BaseCommitActionExecutor.java    |  2 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 29 +++++++++++-----------
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 +-
 8 files changed, 47 insertions(+), 52 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index a5faaffe3d7..b8409c7d19b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -170,7 +170,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
    *
    * @param table A hoodie table instance created after transaction starts so that the latest commits and files are captured.
    * @param metadata Current committing instant's metadata
-   * @param pendingInflightAndRequestedInstants
+   * @param pendingInflightAndRequestedInstants Pending instants on the timeline
+   *
    * @see {@link BaseHoodieWriteClient#preCommit}
    * @see {@link BaseHoodieTableServiceClient#preCommit}
    */
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index a248e6bd9b4..594d32de939 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
   public void preWrite(String instantTime, WriteOperationType writeOperationType,
       HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    this.lastCompletedTxnAndMetadata = txnManager.isOptimisticConcurrencyControlEnabled()
+    this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard()
         ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
     this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
     this.pendingInflightAndRequestedInstants.remove(instantTime);
@@ -1086,7 +1086,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    *
    * @param extraMetadata Metadata to pass onto the scheduled service instant
    * @param tableServiceType Type of table service to schedule
-   * @return
+   *
+   * @return The given instant time option or empty if no table service plan is scheduled
    */
   public Option<String> scheduleTableService(Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1098,20 +1099,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    *
    * @param extraMetadata Metadata to pass onto the scheduled service instant
    * @param tableServiceType Type of table service to schedule
-   * @return
+   *
+   * @return The given instant time option or empty if no table service plan is scheduled
    */
-  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
-                                             TableServiceType tableServiceType) {
-    // A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
-    final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
-        tableServiceType.getAction(), instantTime));
-    try {
-      this.txnManager.beginTransaction(inflightInstant, Option.empty());
-      LOG.info("Scheduling table service " + tableServiceType);
-      return tableServiceClient.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
-    } finally {
-      this.txnManager.endTransaction(inflightInstant);
-    }
+  public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
+    return tableServiceClient.scheduleTableService(instantTime, extraMetadata, tableServiceType);
   }
 
   public HoodieMetrics getMetrics() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index 84fac2db004..f27af9a2549 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -42,13 +42,12 @@ public class DirectMarkerTransactionManager extends TransactionManager {
   private final String filePath;
 
   public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
-    super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)),
-        config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+    super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.needsLockGuard());
     this.filePath = partitionPath + "/" + fileId;
   }
 
   public void beginTransaction(String newTxnOwnerInstantTime) {
-    if (isOptimisticConcurrencyControlEnabled) {
+    if (needsLockGuard) {
       LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
       lockManager.lock();
 
@@ -58,7 +57,7 @@ public class DirectMarkerTransactionManager extends TransactionManager {
   }
 
   public void endTransaction(String currentTxnOwnerInstantTime) {
-    if (isOptimisticConcurrencyControlEnabled) {
+    if (needsLockGuard) {
       LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
           + " for " + filePath);
       if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index 7fddf8a944b..40744a2f5d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -37,23 +37,22 @@ public class TransactionManager implements Serializable {
 
   protected static final Logger LOG = LogManager.getLogger(TransactionManager.class);
   protected final LockManager lockManager;
-  protected final boolean isOptimisticConcurrencyControlEnabled;
+  protected final boolean needsLockGuard;
   protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
-    this(new LockManager(config, fs),
-        config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
+    this(new LockManager(config, fs), config.needsLockGuard());
   }
 
-  protected TransactionManager(LockManager lockManager, boolean isOptimisticConcurrencyControlEnabled) {
+  protected TransactionManager(LockManager lockManager, boolean needsLockGuard) {
     this.lockManager = lockManager;
-    this.isOptimisticConcurrencyControlEnabled = isOptimisticConcurrencyControlEnabled;
+    this.needsLockGuard = needsLockGuard;
   }
 
   public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
                                Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
-    if (isOptimisticConcurrencyControlEnabled) {
+    if (needsLockGuard) {
       LOG.info("Transaction starting for " + newTxnOwnerInstant
           + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
       lockManager.lock();
@@ -64,7 +63,7 @@ public class TransactionManager implements Serializable {
   }
 
   public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
-    if (isOptimisticConcurrencyControlEnabled) {
+    if (needsLockGuard) {
       LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
       if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
         lockManager.unlock();
@@ -85,7 +84,7 @@ public class TransactionManager implements Serializable {
   }
 
   public void close() {
-    if (isOptimisticConcurrencyControlEnabled) {
+    if (needsLockGuard) {
       lockManager.close();
       LOG.info("Transaction manager closed");
     }
@@ -103,7 +102,7 @@ public class TransactionManager implements Serializable {
     return currentTxnOwnerInstant;
   }
 
-  public boolean isOptimisticConcurrencyControlEnabled() {
-    return isOptimisticConcurrencyControlEnabled;
+  public boolean isNeedsLockGuard() {
+    return needsLockGuard;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 886112cae16..f06f0a9cfc5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2405,6 +2405,13 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE);
   }
 
+  /**
+   * Returns whether the explicit guard of lock is required.
+   */
+  public boolean needsLockGuard() {
+    return isMetadataTableEnabled() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+  }
+
   /**
    * Layout configs.
    */
@@ -2946,8 +2953,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
       // isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig
       final TypedProperties writeConfigProperties = writeConfig.getProps();
-      final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
-          || writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
+      final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
       writeConfig.setDefaultOnCondition(!isLockConfigSet,
           HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
 
@@ -2971,13 +2977,10 @@ public class HoodieWriteConfig extends HoodieConfig {
             // This is targeted at Single writer with async table services
             // If user does not set the lock provider, likely that the concurrency mode is not set either
             // Override the configs for metadata table
-            writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
-                WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
             writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
                 InProcessLockProvider.class.getName());
-            LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
+            LOG.info(String.format("Automatically set %s=%s since user has not set the "
                     + "lock provider for single writer with async table services",
-                WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
                 HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
           }
         }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 8f3a0244d2e..0ab7efec82d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -92,7 +92,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
     this.taskContextSupplier = context.getTaskContextSupplier();
     // TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
     this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
-    if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isOptimisticConcurrencyControlEnabled()) {
+    if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isNeedsLockGuard()) {
       // these txn metadata are only needed for auto commit when optimistic concurrent control is also enabled
       this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
       this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index e7afa50a59d..1f4d6b18587 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -140,8 +140,10 @@ public class TestHoodieWriteConfig {
             put(ASYNC_CLEAN.key(), "false");
             put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
-        }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
-        HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+        }), true, true, true,
+        WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+        HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        inProcessLockProviderClassName);
 
     // 2. Async clean
     verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
@@ -153,8 +155,10 @@ public class TestHoodieWriteConfig {
             put(ASYNC_CLEAN.key(), "true");
             put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
-        }), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
-        HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+        }), true, true, true,
+        WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+        HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        inProcessLockProviderClassName);
 
     // 3. Async compaction configured
     verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
@@ -168,12 +172,8 @@ public class TestHoodieWriteConfig {
           }
         }), true,
         tableType == HoodieTableType.MERGE_ON_READ, true,
-        tableType == HoodieTableType.MERGE_ON_READ
-            ? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
-            : WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
-        tableType == HoodieTableType.MERGE_ON_READ
-            ? HoodieFailedWritesCleaningPolicy.LAZY
-            : HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+        HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
         tableType == HoodieTableType.MERGE_ON_READ
             ? inProcessLockProviderClassName
             : HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
@@ -205,8 +205,9 @@ public class TestHoodieWriteConfig {
             put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
           }
         }), true, true, false,
-        WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
-        HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
+        WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+        HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
+        inProcessLockProviderClassName);
   }
 
   @ParameterizedTest
@@ -289,8 +290,8 @@ public class TestHoodieWriteConfig {
     if (writeConfig.areAnyTableServicesAsync()) {
       verifyConcurrencyControlRelatedConfigs(writeConfig,
           true, true, true,
-          WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
-          HoodieFailedWritesCleaningPolicy.LAZY,
+          WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
+          HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
           InProcessLockProvider.class.getName());
     } else {
       verifyConcurrencyControlRelatedConfigs(writeConfig,
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3d4f6a3873b..1d4697d709d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends
    * should be called before the Driver starts a new transaction.
    */
   public void preTxn(HoodieTableMetaClient metaClient) {
-    if (txnManager.isOptimisticConcurrencyControlEnabled()) {
+    if (txnManager.isNeedsLockGuard()) {
       // refresh the meta client which is reused
       metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);