You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/23 08:37:54 UTC

[hudi] branch master updated: [HUDI-6123] Auto adjust lock configs for single writer (#8542)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9dd3a82d3 [HUDI-6123] Auto adjust lock configs for single writer (#8542)
9a9dd3a82d3 is described below

commit 9a9dd3a82d3e69f1d5eebe46c79c8fd0dc2355db
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Apr 23 16:37:48 2023 +0800

    [HUDI-6123] Auto adjust lock configs for single writer (#8542)
    
    Currently, the `hoodie.auto.adjust.lock.configs` opiton is by default false for batch mode ingestion,
    and true for spark streaming sink and delta_streamer, while MDT is by default enabled.
    
    For multiple streaming writers with no explicit lock provider set up, `InProcessLockProvider` should not be used.
    
    Change list:
    1. Restrict the option `hoodie.auto.adjust.lock.configs` to take effect in single writer scope, because for multi-writer, the `InProcessLockProvider` can not work as expected among hosts/processes;
    2. The LockManager #lock and #unlock are invoked from the TransactionManager
    which already does the checks for the requirement of an explicit lock, remove the redundant check in LockManager.
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../DirectMarkerTransactionManager.java            |  6 +-
 .../client/transaction/TransactionManager.java     | 18 +++---
 .../hudi/client/transaction/lock/LockManager.java  | 62 ++++++++++-----------
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 54 +++++++++---------
 .../action/commit/BaseCommitActionExecutor.java    |  2 +-
 .../hudi/client/transaction/TestLockManager.java   | 65 +++++++++++++---------
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 +-
 .../apache/hudi/configuration/OptionsResolver.java |  2 +-
 .../org/apache/hudi/util/FlinkWriteClients.java    |  2 +-
 10 files changed, 113 insertions(+), 102 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 2f129c0aab6..79a813567b9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
   public void preWrite(String instantTime, WriteOperationType writeOperationType,
       HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
-    this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard()
+    this.lastCompletedTxnAndMetadata = txnManager.isLockRequired()
         ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
     this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
     this.pendingInflightAndRequestedInstants.remove(instantTime);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index f27af9a2549..7ed6d51038c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -42,12 +42,12 @@ public class DirectMarkerTransactionManager extends TransactionManager {
   private final String filePath;
 
   public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
-    super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.needsLockGuard());
+    super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.isLockRequired());
     this.filePath = partitionPath + "/" + fileId;
   }
 
   public void beginTransaction(String newTxnOwnerInstantTime) {
-    if (needsLockGuard) {
+    if (isLockRequired) {
       LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
       lockManager.lock();
 
@@ -57,7 +57,7 @@ public class DirectMarkerTransactionManager extends TransactionManager {
   }
 
   public void endTransaction(String currentTxnOwnerInstantTime) {
-    if (needsLockGuard) {
+    if (isLockRequired) {
       LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
           + " for " + filePath);
       if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index c257d46f594..b3e9abc7a3a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -37,22 +37,22 @@ public class TransactionManager implements Serializable {
 
   protected static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
   protected final LockManager lockManager;
-  protected final boolean needsLockGuard;
+  protected final boolean isLockRequired;
   protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
   private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
 
   public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
-    this(new LockManager(config, fs), config.needsLockGuard());
+    this(new LockManager(config, fs), config.isLockRequired());
   }
 
-  protected TransactionManager(LockManager lockManager, boolean needsLockGuard) {
+  protected TransactionManager(LockManager lockManager, boolean isLockRequired) {
     this.lockManager = lockManager;
-    this.needsLockGuard = needsLockGuard;
+    this.isLockRequired = isLockRequired;
   }
 
   public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
                                Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
-    if (needsLockGuard) {
+    if (isLockRequired) {
       LOG.info("Transaction starting for " + newTxnOwnerInstant
           + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
       lockManager.lock();
@@ -63,7 +63,7 @@ public class TransactionManager implements Serializable {
   }
 
   public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
-    if (needsLockGuard) {
+    if (isLockRequired) {
       LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
       if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
         lockManager.unlock();
@@ -84,7 +84,7 @@ public class TransactionManager implements Serializable {
   }
 
   public void close() {
-    if (needsLockGuard) {
+    if (isLockRequired) {
       lockManager.close();
       LOG.info("Transaction manager closed");
     }
@@ -102,7 +102,7 @@ public class TransactionManager implements Serializable {
     return currentTxnOwnerInstant;
   }
 
-  public boolean isNeedsLockGuard() {
-    return needsLockGuard;
+  public boolean isLockRequired() {
+    return isLockRequired;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index d7c7905c793..598f7cd7072 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -68,39 +68,37 @@ public class LockManager implements Serializable, AutoCloseable {
   }
 
   public void lock() {
-    if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
-      LockProvider lockProvider = getLockProvider();
-      int retryCount = 0;
-      boolean acquired = false;
-      while (retryCount <= maxRetries) {
+    LockProvider lockProvider = getLockProvider();
+    int retryCount = 0;
+    boolean acquired = false;
+    while (retryCount <= maxRetries) {
+      try {
+        metrics.startLockApiTimerContext();
+        acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
+        if (acquired) {
+          metrics.updateLockAcquiredMetric();
+          break;
+        }
+        metrics.updateLockNotAcquiredMetric();
+        LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo());
+        Thread.sleep(maxWaitTimeInMs);
+      } catch (HoodieLockException | InterruptedException e) {
+        metrics.updateLockNotAcquiredMetric();
+        if (retryCount >= maxRetries) {
+          throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
+        }
         try {
-          metrics.startLockApiTimerContext();
-          acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
-          if (acquired) {
-            metrics.updateLockAcquiredMetric();
-            break;
-          }
-          metrics.updateLockNotAcquiredMetric();
-          LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo());
           Thread.sleep(maxWaitTimeInMs);
-        } catch (HoodieLockException | InterruptedException e) {
-          metrics.updateLockNotAcquiredMetric();
-          if (retryCount >= maxRetries) {
-            throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
-          }
-          try {
-            Thread.sleep(maxWaitTimeInMs);
-          } catch (InterruptedException ex) {
-            // ignore InterruptedException here
-          }
-        } finally {
-          retryCount++;
+        } catch (InterruptedException ex) {
+          // ignore InterruptedException here
         }
-      }
-      if (!acquired) {
-        throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock());
+      } finally {
+        retryCount++;
       }
     }
+    if (!acquired) {
+      throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock());
+    }
   }
 
   /**
@@ -108,11 +106,9 @@ public class LockManager implements Serializable, AutoCloseable {
    * and tries to call unlock()
    */
   public void unlock() {
-    if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
-      getLockProvider().unlock();
-      metrics.updateLockHeldTimerMetrics();
-      close();
-    }
+    getLockProvider().unlock();
+    metrics.updateLockHeldTimerMetrics();
+    close();
   }
 
   public synchronized LockProvider getLockProvider() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0475619944c..b5e93c90e1f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2344,7 +2344,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * File listing metadata configs.
    */
   public boolean isMetadataTableEnabled() {
-    return metadataConfig.enabled();
+    return getBooleanOrDefault(HoodieMetadataConfig.ENABLE);
   }
 
   public int getMetadataInsertParallelism() {
@@ -2483,8 +2483,15 @@ public class HoodieWriteConfig extends HoodieConfig {
   /**
    * Returns whether the explicit guard of lock is required.
    */
-  public boolean needsLockGuard() {
-    return isMetadataTableEnabled() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+  public boolean isLockRequired() {
+    return !isDefaultLockProvider() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+  }
+
+  /**
+   * Returns whether the lock provider is default.
+   */
+  private boolean isDefaultLockProvider() {
+    return HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue().equals(getLockProviderClass());
   }
 
   /**
@@ -3035,30 +3042,27 @@ public class HoodieWriteConfig extends HoodieConfig {
       autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
     }
 
+    private boolean isLockRequiredForSingleWriter() {
+      // When metadata table is enabled, lock provider must be used for
+      // single writer with async table services.
+      // Async table services can update the metadata table and a lock provider is
+      // needed to guard against any concurrent table write operations. If user has
+      // not configured any lock provider, let's use the InProcess lock provider.
+      return writeConfig.isMetadataTableEnabled() && writeConfig.areAnyTableServicesAsync()
+          && !writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+    }
+
     private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) {
-      if (writeConfig.isAutoAdjustLockConfigs()) {
+      if (!isLockProviderPropertySet && writeConfig.isAutoAdjustLockConfigs() && isLockRequiredForSingleWriter()) {
         // auto adjustment is required only for deltastreamer and spark streaming where async table services can be executed in the same JVM.
-        boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
-
-        if (isMetadataTableEnabled) {
-          // When metadata table is enabled, optimistic concurrency control must be used for
-          // single writer with async table services.
-          // Async table services can update the metadata table and a lock provider is
-          // needed to guard against any concurrent table write operations. If user has
-          // not configured any lock provider, let's use the InProcess lock provider.
-          boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
-          boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();
-          if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
-            // This is targeted at Single writer with async table services
-            // If user does not set the lock provider, likely that the concurrency mode is not set either
-            // Override the configs for metadata table
-            writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
-                InProcessLockProvider.class.getName());
-            LOG.info(String.format("Automatically set %s=%s since user has not set the "
-                    + "lock provider for single writer with async table services",
-                HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
-          }
-        }
+        // This is targeted at Single writer with async table services
+        // If user does not set the lock provider, likely that the concurrency mode is not set either
+        // Override the configs for metadata table
+        writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+            InProcessLockProvider.class.getName());
+        LOG.info(String.format("Automatically set %s=%s since user has not set the "
+                + "lock provider for single writer with async table services",
+            HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
       }
 
       // We check if "hoodie.cleaner.policy.failed.writes"
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 1a79904ef5c..318f6155235 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -94,7 +94,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
     this.taskContextSupplier = context.getTaskContextSupplier();
     // TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
     this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
-    if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isNeedsLockGuard()) {
+    if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isLockRequired()) {
       // these txn metadata are only needed for auto commit when optimistic concurrent control is also enabled
       this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
       this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
index 96d8ae65d98..1b4c08c5329 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
@@ -31,7 +31,8 @@ import org.apache.curator.test.TestingServer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +46,8 @@ public class TestLockManager extends HoodieCommonTestHarness {
   private static final Logger LOG = LoggerFactory.getLogger(TestLockManager.class);
 
   private static TestingServer server;
-  private static String zk_basePath = "/hudi/test/lock";
-  private static String key = "table1";
-
-  HoodieWriteConfig writeConfig;
-  LockManager lockManager;
+  private static final String ZK_BASE_PATH = "/hudi/test/lock";
+  private static final String KEY = "table1";
 
   @BeforeAll
   public static void setup() {
@@ -70,32 +68,17 @@ public class TestLockManager extends HoodieCommonTestHarness {
     }
   }
 
-  private HoodieWriteConfig getWriteConfig() {
-    return HoodieWriteConfig.newBuilder()
-        .withPath(basePath)
-        .withCleanConfig(HoodieCleanConfig.newBuilder()
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-            .build())
-        .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
-        .withLockConfig(HoodieLockConfig.newBuilder()
-            .withLockProvider(ZookeeperBasedLockProvider.class)
-            .withZkBasePath(zk_basePath)
-            .withZkLockKey(key)
-            .withZkQuorum(server.getConnectString())
-            .build())
-        .build();
-  }
-
   @BeforeEach
-  private void init() throws IOException {
+  void init() throws IOException {
     initPath();
     initMetaClient();
-    this.writeConfig = getWriteConfig();
-    this.lockManager = new LockManager(this.writeConfig, this.metaClient.getFs());
   }
 
-  @Test
-  public void testLockAndUnlock() {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testLockAndUnlock(boolean multiWriter) {
+    HoodieWriteConfig writeConfig = multiWriter ? getMultiWriterWriteConfig() : getSingleWriterWriteConfig();
+    LockManager lockManager = new LockManager(writeConfig, this.metaClient.getFs());
     LockManager mockLockManager = Mockito.spy(lockManager);
 
     assertDoesNotThrow(() -> {
@@ -108,4 +91,32 @@ public class TestLockManager extends HoodieCommonTestHarness {
 
     Mockito.verify(mockLockManager).close();
   }
+
+  private HoodieWriteConfig getMultiWriterWriteConfig() {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .build())
+        .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(ZookeeperBasedLockProvider.class)
+            .withZkBasePath(ZK_BASE_PATH)
+            .withZkLockKey(KEY)
+            .withZkQuorum(server.getConnectString())
+            .build())
+        .build();
+  }
+
+  private HoodieWriteConfig getSingleWriterWriteConfig() {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(ZookeeperBasedLockProvider.class)
+            .withZkBasePath(ZK_BASE_PATH)
+            .withZkLockKey(KEY)
+            .withZkQuorum(server.getConnectString())
+            .build())
+        .build();
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 0498d190fc8..3eeffc3943e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends
    * should be called before the Driver starts a new transaction.
    */
   public void preTxn(HoodieTableMetaClient metaClient) {
-    if (txnManager.isNeedsLockGuard()) {
+    if (txnManager.isLockRequired()) {
       // refresh the meta client which is reused
       metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6da87e66089..fd1e5b5f32b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -243,7 +243,7 @@ public class OptionsResolver {
   /**
    * Returns whether the writer txn should be guarded by lock.
    */
-  public static boolean needsGuardByLock(Configuration conf) {
+  public static boolean isLockRequired(Configuration conf) {
     return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
         || conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
             .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 34d326f843f..c039cbaacd8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -224,7 +224,7 @@ public class FlinkWriteClients {
             .withProps(flinkConf2TypedProperties(conf))
             .withSchema(getSourceSchema(conf).toString());
 
-    if (OptionsResolver.needsGuardByLock(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
+    if (OptionsResolver.isLockRequired(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
       // configure the fs lock provider by default
       builder.withLockConfig(HoodieLockConfig.newBuilder()
           .withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))