You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/23 08:37:54 UTC
[hudi] branch master updated: [HUDI-6123] Auto adjust lock configs for single writer (#8542)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9a9dd3a82d3 [HUDI-6123] Auto adjust lock configs for single writer (#8542)
9a9dd3a82d3 is described below
commit 9a9dd3a82d3e69f1d5eebe46c79c8fd0dc2355db
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Apr 23 16:37:48 2023 +0800
[HUDI-6123] Auto adjust lock configs for single writer (#8542)
Currently, the `hoodie.auto.adjust.lock.configs` opiton is by default false for batch mode ingestion,
and true for spark streaming sink and delta_streamer, while MDT is by default enabled.
For multiple streaming writers with no explicit lock provider set up, `InProcessLockProvider` should not be used.
Change list:
1. Restrict the option `hoodie.auto.adjust.lock.configs` to take effect in single writer scope, because for multi-writer, the `InProcessLockProvider` can not work as expected among hosts/processes;
2. The LockManager #lock and #unlock are invoked from the TransactionManager
which already does the checks for the requirement of an explicit lock, remove the redundant check in LockManager.
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../DirectMarkerTransactionManager.java | 6 +-
.../client/transaction/TransactionManager.java | 18 +++---
.../hudi/client/transaction/lock/LockManager.java | 62 ++++++++++-----------
.../org/apache/hudi/config/HoodieWriteConfig.java | 54 +++++++++---------
.../action/commit/BaseCommitActionExecutor.java | 2 +-
.../hudi/client/transaction/TestLockManager.java | 65 +++++++++++++---------
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
.../apache/hudi/configuration/OptionsResolver.java | 2 +-
.../org/apache/hudi/util/FlinkWriteClients.java | 2 +-
10 files changed, 113 insertions(+), 102 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 2f129c0aab6..79a813567b9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard()
+ this.lastCompletedTxnAndMetadata = txnManager.isLockRequired()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
index f27af9a2549..7ed6d51038c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java
@@ -42,12 +42,12 @@ public class DirectMarkerTransactionManager extends TransactionManager {
private final String filePath;
public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
- super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.needsLockGuard());
+ super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.isLockRequired());
this.filePath = partitionPath + "/" + fileId;
}
public void beginTransaction(String newTxnOwnerInstantTime) {
- if (needsLockGuard) {
+ if (isLockRequired) {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
lockManager.lock();
@@ -57,7 +57,7 @@ public class DirectMarkerTransactionManager extends TransactionManager {
}
public void endTransaction(String currentTxnOwnerInstantTime) {
- if (needsLockGuard) {
+ if (isLockRequired) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index c257d46f594..b3e9abc7a3a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -37,22 +37,22 @@ public class TransactionManager implements Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
protected final LockManager lockManager;
- protected final boolean needsLockGuard;
+ protected final boolean isLockRequired;
protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
- this(new LockManager(config, fs), config.needsLockGuard());
+ this(new LockManager(config, fs), config.isLockRequired());
}
- protected TransactionManager(LockManager lockManager, boolean needsLockGuard) {
+ protected TransactionManager(LockManager lockManager, boolean isLockRequired) {
this.lockManager = lockManager;
- this.needsLockGuard = needsLockGuard;
+ this.isLockRequired = isLockRequired;
}
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
- if (needsLockGuard) {
+ if (isLockRequired) {
LOG.info("Transaction starting for " + newTxnOwnerInstant
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
lockManager.lock();
@@ -63,7 +63,7 @@ public class TransactionManager implements Serializable {
}
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
- if (needsLockGuard) {
+ if (isLockRequired) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
lockManager.unlock();
@@ -84,7 +84,7 @@ public class TransactionManager implements Serializable {
}
public void close() {
- if (needsLockGuard) {
+ if (isLockRequired) {
lockManager.close();
LOG.info("Transaction manager closed");
}
@@ -102,7 +102,7 @@ public class TransactionManager implements Serializable {
return currentTxnOwnerInstant;
}
- public boolean isNeedsLockGuard() {
- return needsLockGuard;
+ public boolean isLockRequired() {
+ return isLockRequired;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index d7c7905c793..598f7cd7072 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -68,39 +68,37 @@ public class LockManager implements Serializable, AutoCloseable {
}
public void lock() {
- if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
- LockProvider lockProvider = getLockProvider();
- int retryCount = 0;
- boolean acquired = false;
- while (retryCount <= maxRetries) {
+ LockProvider lockProvider = getLockProvider();
+ int retryCount = 0;
+ boolean acquired = false;
+ while (retryCount <= maxRetries) {
+ try {
+ metrics.startLockApiTimerContext();
+ acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
+ if (acquired) {
+ metrics.updateLockAcquiredMetric();
+ break;
+ }
+ metrics.updateLockNotAcquiredMetric();
+ LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo());
+ Thread.sleep(maxWaitTimeInMs);
+ } catch (HoodieLockException | InterruptedException e) {
+ metrics.updateLockNotAcquiredMetric();
+ if (retryCount >= maxRetries) {
+ throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
+ }
try {
- metrics.startLockApiTimerContext();
- acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
- if (acquired) {
- metrics.updateLockAcquiredMetric();
- break;
- }
- metrics.updateLockNotAcquiredMetric();
- LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo());
Thread.sleep(maxWaitTimeInMs);
- } catch (HoodieLockException | InterruptedException e) {
- metrics.updateLockNotAcquiredMetric();
- if (retryCount >= maxRetries) {
- throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e);
- }
- try {
- Thread.sleep(maxWaitTimeInMs);
- } catch (InterruptedException ex) {
- // ignore InterruptedException here
- }
- } finally {
- retryCount++;
+ } catch (InterruptedException ex) {
+ // ignore InterruptedException here
}
- }
- if (!acquired) {
- throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock());
+ } finally {
+ retryCount++;
}
}
+ if (!acquired) {
+ throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock());
+ }
}
/**
@@ -108,11 +106,9 @@ public class LockManager implements Serializable, AutoCloseable {
* and tries to call unlock()
*/
public void unlock() {
- if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
- getLockProvider().unlock();
- metrics.updateLockHeldTimerMetrics();
- close();
- }
+ getLockProvider().unlock();
+ metrics.updateLockHeldTimerMetrics();
+ close();
}
public synchronized LockProvider getLockProvider() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0475619944c..b5e93c90e1f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2344,7 +2344,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* File listing metadata configs.
*/
public boolean isMetadataTableEnabled() {
- return metadataConfig.enabled();
+ return getBooleanOrDefault(HoodieMetadataConfig.ENABLE);
}
public int getMetadataInsertParallelism() {
@@ -2483,8 +2483,15 @@ public class HoodieWriteConfig extends HoodieConfig {
/**
* Returns whether the explicit guard of lock is required.
*/
- public boolean needsLockGuard() {
- return isMetadataTableEnabled() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ public boolean isLockRequired() {
+ return !isDefaultLockProvider() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ }
+
+ /**
+ * Returns whether the lock provider is default.
+ */
+ private boolean isDefaultLockProvider() {
+ return HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue().equals(getLockProviderClass());
}
/**
@@ -3035,30 +3042,27 @@ public class HoodieWriteConfig extends HoodieConfig {
autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
}
+ private boolean isLockRequiredForSingleWriter() {
+ // When metadata table is enabled, lock provider must be used for
+ // single writer with async table services.
+ // Async table services can update the metadata table and a lock provider is
+ // needed to guard against any concurrent table write operations. If user has
+ // not configured any lock provider, let's use the InProcess lock provider.
+ return writeConfig.isMetadataTableEnabled() && writeConfig.areAnyTableServicesAsync()
+ && !writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
+ }
+
private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) {
- if (writeConfig.isAutoAdjustLockConfigs()) {
+ if (!isLockProviderPropertySet && writeConfig.isAutoAdjustLockConfigs() && isLockRequiredForSingleWriter()) {
// auto adjustment is required only for deltastreamer and spark streaming where async table services can be executed in the same JVM.
- boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
-
- if (isMetadataTableEnabled) {
- // When metadata table is enabled, optimistic concurrency control must be used for
- // single writer with async table services.
- // Async table services can update the metadata table and a lock provider is
- // needed to guard against any concurrent table write operations. If user has
- // not configured any lock provider, let's use the InProcess lock provider.
- boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
- boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();
- if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
- // This is targeted at Single writer with async table services
- // If user does not set the lock provider, likely that the concurrency mode is not set either
- // Override the configs for metadata table
- writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
- InProcessLockProvider.class.getName());
- LOG.info(String.format("Automatically set %s=%s since user has not set the "
- + "lock provider for single writer with async table services",
- HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
- }
- }
+ // This is targeted at Single writer with async table services
+ // If user does not set the lock provider, likely that the concurrency mode is not set either
+ // Override the configs for metadata table
+ writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+ InProcessLockProvider.class.getName());
+ LOG.info(String.format("Automatically set %s=%s since user has not set the "
+ + "lock provider for single writer with async table services",
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
}
// We check if "hoodie.cleaner.policy.failed.writes"
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 1a79904ef5c..318f6155235 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -94,7 +94,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R>
this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new TransactionManager(config, table.getMetaClient().getFs())) : Option.empty();
- if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isNeedsLockGuard()) {
+ if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isLockRequired()) {
// these txn metadata are only needed for auto commit when optimistic concurrent control is also enabled
this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
index 96d8ae65d98..1b4c08c5329 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java
@@ -31,7 +31,8 @@ import org.apache.curator.test.TestingServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +46,8 @@ public class TestLockManager extends HoodieCommonTestHarness {
private static final Logger LOG = LoggerFactory.getLogger(TestLockManager.class);
private static TestingServer server;
- private static String zk_basePath = "/hudi/test/lock";
- private static String key = "table1";
-
- HoodieWriteConfig writeConfig;
- LockManager lockManager;
+ private static final String ZK_BASE_PATH = "/hudi/test/lock";
+ private static final String KEY = "table1";
@BeforeAll
public static void setup() {
@@ -70,32 +68,17 @@ public class TestLockManager extends HoodieCommonTestHarness {
}
}
- private HoodieWriteConfig getWriteConfig() {
- return HoodieWriteConfig.newBuilder()
- .withPath(basePath)
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
- .build())
- .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
- .withLockConfig(HoodieLockConfig.newBuilder()
- .withLockProvider(ZookeeperBasedLockProvider.class)
- .withZkBasePath(zk_basePath)
- .withZkLockKey(key)
- .withZkQuorum(server.getConnectString())
- .build())
- .build();
- }
-
@BeforeEach
- private void init() throws IOException {
+ void init() throws IOException {
initPath();
initMetaClient();
- this.writeConfig = getWriteConfig();
- this.lockManager = new LockManager(this.writeConfig, this.metaClient.getFs());
}
- @Test
- public void testLockAndUnlock() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLockAndUnlock(boolean multiWriter) {
+ HoodieWriteConfig writeConfig = multiWriter ? getMultiWriterWriteConfig() : getSingleWriterWriteConfig();
+ LockManager lockManager = new LockManager(writeConfig, this.metaClient.getFs());
LockManager mockLockManager = Mockito.spy(lockManager);
assertDoesNotThrow(() -> {
@@ -108,4 +91,32 @@ public class TestLockManager extends HoodieCommonTestHarness {
Mockito.verify(mockLockManager).close();
}
+
+ private HoodieWriteConfig getMultiWriterWriteConfig() {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
+ .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(ZookeeperBasedLockProvider.class)
+ .withZkBasePath(ZK_BASE_PATH)
+ .withZkLockKey(KEY)
+ .withZkQuorum(server.getConnectString())
+ .build())
+ .build();
+ }
+
+ private HoodieWriteConfig getSingleWriterWriteConfig() {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(ZookeeperBasedLockProvider.class)
+ .withZkBasePath(ZK_BASE_PATH)
+ .withZkLockKey(KEY)
+ .withZkQuorum(server.getConnectString())
+ .build())
+ .build();
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 0498d190fc8..3eeffc3943e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends
* should be called before the Driver starts a new transaction.
*/
public void preTxn(HoodieTableMetaClient metaClient) {
- if (txnManager.isNeedsLockGuard()) {
+ if (txnManager.isLockRequired()) {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 6da87e66089..fd1e5b5f32b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -243,7 +243,7 @@ public class OptionsResolver {
/**
* Returns whether the writer txn should be guarded by lock.
*/
- public static boolean needsGuardByLock(Configuration conf) {
+ public static boolean isLockRequired(Configuration conf) {
return conf.getBoolean(FlinkOptions.METADATA_ENABLED)
|| conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 34d326f843f..c039cbaacd8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -224,7 +224,7 @@ public class FlinkWriteClients {
.withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());
- if (OptionsResolver.needsGuardByLock(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
+ if (OptionsResolver.isLockRequired(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
// configure the fs lock provider by default
builder.withLockConfig(HoodieLockConfig.newBuilder()
.withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))