You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2022/07/18 23:52:54 UTC

[hudi] branch master updated: [HUDI-4065] Add FileBasedLockProvider (#6071)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 382d19e85b [HUDI-4065] Add FileBasedLockProvider (#6071)
382d19e85b is described below

commit 382d19e85b06d0fc7f4ad37bb4e4eae3f5e76b78
Author: 冯健 <fe...@gmail.com>
AuthorDate: Tue Jul 19 07:52:47 2022 +0800

    [HUDI-4065] Add FileBasedLockProvider (#6071)
---
 .../lock/FileSystemBasedLockProvider.java          | 152 +++++++++++++++++++++
 .../org/apache/hudi/config/HoodieLockConfig.java   |   9 +-
 .../hudi/client/TestFileBasedLockProvider.java     | 135 ++++++++++++++++++
 .../hudi/client/TestHoodieClientMultiWriter.java   |  87 +++++++-----
 .../hudi/common/config/LockConfiguration.java      |   2 +
 5 files changed, 349 insertions(+), 36 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
new file mode 100644
index 0000000000..96a42e8409
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+
+/**
+ * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations
+ * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again.
+ * NOTE: This only works for DFS with atomic create/delete operation
+ */
+public class FileSystemBasedLockProvider implements LockProvider<String>, Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class);
+
+  private static final String LOCK_FILE_NAME = "lock";
+
+  private final int lockTimeoutMinutes;
+  private transient FileSystem fs;
+  private transient Path lockFile;
+  protected LockConfiguration lockConfiguration;
+
+  public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) {
+    checkRequiredProps(lockConfiguration);
+    this.lockConfiguration = lockConfiguration;
+    String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
+    if (StringUtils.isNullOrEmpty(lockDirectory)) {
+      lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
+            + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
+    }
+    this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
+    this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
+    this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
+  }
+
+  @Override
+  public void close() {
+    synchronized (LOCK_FILE_NAME) {
+      try {
+        fs.delete(this.lockFile, true);
+      } catch (IOException e) {
+        throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
+      }
+    }
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+    try {
+      synchronized (LOCK_FILE_NAME) {
+        // Check whether lock is already expired, if so try to delete lock file
+        if (fs.exists(this.lockFile) && checkIfExpired()) {
+          fs.delete(this.lockFile, true);
+        }
+        acquireLock();
+        return fs.exists(this.lockFile);
+      }
+    } catch (IOException | HoodieIOException e) {
+      LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+      return false;
+    }
+  }
+
+  @Override
+  public void unlock() {
+    synchronized (LOCK_FILE_NAME) {
+      try {
+        if (fs.exists(this.lockFile)) {
+          fs.delete(this.lockFile, true);
+        }
+      } catch (IOException io) {
+        throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
+      }
+    }
+  }
+
+  @Override
+  public String getLock() {
+    return this.lockFile.toString();
+  }
+
+  private boolean checkIfExpired() {
+    if (lockTimeoutMinutes == 0) {
+      return false;
+    }
+    try {
+      long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
+      if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) {
+        return true;
+      }
+    } catch (IOException | HoodieIOException e) {
+      LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e);
+    }
+    return false;
+  }
+
+  private void acquireLock() {
+    try {
+      fs.create(this.lockFile, false).close();
+    } catch (IOException e) {
+      throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+    }
+  }
+
+  protected String generateLogStatement(LockState state) {
+    return StringUtils.join(state.name(), " lock at: ", getLock());
+  }
+
+  private void checkRequiredProps(final LockConfiguration config) {
+    ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null
+          || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null);
+    ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index 9ea28fbbd4..7fcc96810b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -36,6 +36,7 @@ import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUI
 import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
 import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
 import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
@@ -106,7 +107,13 @@ public class HoodieLockConfig extends HoodieConfig {
       .key(FILESYSTEM_LOCK_PATH_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
-      .withDocumentation("For DFS based lock providers, path to store the locks under.");
+      .withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default");
+
+  public static final ConfigProperty<Integer> FILESYSTEM_LOCK_EXPIRE = ConfigProperty
+        .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
+        .defaultValue(0)
+        .sinceVersion("0.12.0")
+        .withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire");
 
   public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
       .key(HIVE_DATABASE_NAME_PROP_KEY)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
new file mode 100644
index 0000000000..208e9cd62e
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+public class TestFileBasedLockProvider {
+  private static HdfsTestService hdfsTestService;
+  private static MiniDFSCluster dfsCluster;
+  private static LockConfiguration lockConfiguration;
+  private static Configuration hadoopConf;
+
+  @BeforeAll
+  public static void setup() throws IOException {
+    hdfsTestService = new HdfsTestService();
+    dfsCluster = hdfsTestService.start(true);
+    hadoopConf = dfsCluster.getFileSystem().getConf();
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
+    properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
+    properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+    properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
+    properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+    lockConfiguration = new LockConfiguration(properties);
+  }
+
+  @AfterAll
+  public static void cleanUpAfterAll() throws IOException {
+    Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
+    FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
+    fs.delete(new Path("/tmp"), true);
+    if (hdfsTestService != null) {
+      hdfsTestService.stop();
+      hdfsTestService = null;
+    }
+  }
+
+  @AfterEach
+  public void cleanUpAfterEach() throws IOException {
+    Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
+    FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
+    fs.delete(new Path("/tmp/lock"), true);
+  }
+
+  @Test
+  public void testAcquireLock() {
+    FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+    Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+    fileBasedLockProvider.unlock();
+  }
+
+  @Test
+  public void testAcquireLockWithDefaultPath() {
+    lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
+    lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/");
+    FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+    Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+    fileBasedLockProvider.unlock();
+    lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
+  }
+
+  @Test
+  public void testUnLock() {
+    FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+    Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+    fileBasedLockProvider.unlock();
+    Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+  }
+
+  @Test
+  public void testReentrantLock() {
+    FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+    Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+          .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+    Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+    fileBasedLockProvider.unlock();
+  }
+
+  @Test
+  public void testUnlockWithoutLock() {
+    try {
+      FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+      fileBasedLockProvider.unlock();
+    } catch (HoodieLockException e) {
+      Assertions.fail();
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 268674e78d..6ad8666a0f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -45,9 +46,11 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -66,7 +69,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -76,6 +83,21 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
 
+  private Properties lockProperties = null;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    if (lockProperties == null) {
+      lockProperties = new Properties();
+      lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
+      lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+      lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
+      lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+      lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
+      lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+    }
+  }
+
   public void setUpMORTestTable() throws IOException {
     cleanupResources();
     initPath();
@@ -92,15 +114,27 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     cleanupResources();
   }
 
+  private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
+        InProcessLockProvider.class,
+        FileSystemBasedLockProvider.class);
+
+  private static Iterable<Object[]> providerClassAndTableType() {
+    List<Object[]> opts = new ArrayList<>();
+    for (Object providerClass : LOCK_PROVIDER_CLASSES) {
+      opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass});
+      opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass});
+    }
+    return opts;
+  }
+
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
-  public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception {
+  @MethodSource("providerClassAndTableType")
+  public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception {
     if (tableType == HoodieTableType.MERGE_ON_READ) {
       setUpMORTestTable();
     }
-    Properties properties = new Properties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+    lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+
     HoodieWriteConfig writeConfig = getConfigBuilder()
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
@@ -110,8 +144,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         // Timeline-server-based markers are not used for multi-writer tests
         .withMarkersType(MarkerType.DIRECT.name())
-        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-            .build()).withAutoCommit(false).withProperties(properties).build();
+        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
+            .build()).withAutoCommit(false).withProperties(lockProperties).build();
 
     // Create the first commit
     createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
@@ -172,16 +206,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     assertTrue(writer1Completed.get() && writer2Completed.get());
   }
 
-  @Test
-  public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
-    testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
-  }
-
-  @Test
-  public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception {
-    testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
-  }
-
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
   public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
@@ -189,11 +213,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
       setUpMORTestTable();
     }
 
-    Properties properties = new Properties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
+    lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+    lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
+    lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
 
     HoodieWriteConfig cfg = getConfigBuilder()
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -210,7 +232,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
         .withAutoCommit(false)
         // Timeline-server-based markers are not used for multi-writer tests
         .withMarkersType(MarkerType.DIRECT.name())
-        .withProperties(properties)
+        .withProperties(lockProperties)
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
             .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
         .build();
@@ -260,15 +282,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     }
   }
 
-  private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
+  @ParameterizedTest
+  @MethodSource("providerClassAndTableType")
+  public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception {
     // create inserts X 1
     if (tableType == HoodieTableType.MERGE_ON_READ) {
       setUpMORTestTable();
     }
-    Properties properties = new Properties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     // Disabling embedded timeline server, it doesn't work with multiwriter
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -284,8 +304,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
             FileSystemViewStorageType.MEMORY).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
-        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-            .build()).withAutoCommit(false).withProperties(properties);
+        .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
+            .build()).withAutoCommit(false).withProperties(lockProperties);
     Set<String> validInstants = new HashSet<>();
     // Create the first commit with inserts
     HoodieWriteConfig cfg = writeConfigBuilder.build();
@@ -458,10 +478,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
 
   @Test
   public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
-    Properties properties = new Properties();
-    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
-    properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
+    lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
@@ -470,7 +487,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
         // Timeline-server-based markers are not used for multi-writer tests
         .withMarkersType(MarkerType.DIRECT.name())
         .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-            .build()).withAutoCommit(true).withProperties(properties);
+            .build()).withAutoCommit(true).withProperties(lockProperties);
     HoodieWriteConfig cfg = writeConfigBuilder.build();
     HoodieWriteConfig cfg2 = writeConfigBuilder.build();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 0109f22097..c6ebc54e95 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -48,6 +48,8 @@ public class LockConfiguration implements Serializable {
 
   public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
 
+  public static final String FILESYSTEM_LOCK_EXPIRE_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "expire";
+
   // configs for metastore based locks
   public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";