You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2022/07/18 23:52:54 UTC
[hudi] branch master updated: [HUDI-4065] Add FileBasedLockProvider (#6071)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 382d19e85b [HUDI-4065] Add FileBasedLockProvider (#6071)
382d19e85b is described below
commit 382d19e85b06d0fc7f4ad37bb4e4eae3f5e76b78
Author: 冯健 <fe...@gmail.com>
AuthorDate: Tue Jul 19 07:52:47 2022 +0800
[HUDI-4065] Add FileBasedLockProvider (#6071)
---
.../lock/FileSystemBasedLockProvider.java | 152 +++++++++++++++++++++
.../org/apache/hudi/config/HoodieLockConfig.java | 9 +-
.../hudi/client/TestFileBasedLockProvider.java | 135 ++++++++++++++++++
.../hudi/client/TestHoodieClientMultiWriter.java | 87 +++++++-----
.../hudi/common/config/LockConfiguration.java | 2 +
5 files changed, 349 insertions(+), 36 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
new file mode 100644
index 0000000000..96a42e8409
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+
+/**
+ * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations
+ * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again.
+ * NOTE: This only works for DFS with atomic create/delete operation
+ */
+public class FileSystemBasedLockProvider implements LockProvider<String>, Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class);
+
+ private static final String LOCK_FILE_NAME = "lock";
+
+ private final int lockTimeoutMinutes;
+ private transient FileSystem fs;
+ private transient Path lockFile;
+ protected LockConfiguration lockConfiguration;
+
+ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
+ if (StringUtils.isNullOrEmpty(lockDirectory)) {
+ lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
+ + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
+ }
+ this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
+ this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
+ this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
+ }
+
+ @Override
+ public void close() {
+ synchronized (LOCK_FILE_NAME) {
+ try {
+ fs.delete(this.lockFile, true);
+ } catch (IOException e) {
+ throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ try {
+ synchronized (LOCK_FILE_NAME) {
+ // Check whether lock is already expired, if so try to delete lock file
+ if (fs.exists(this.lockFile) && checkIfExpired()) {
+ fs.delete(this.lockFile, true);
+ }
+ acquireLock();
+ return fs.exists(this.lockFile);
+ }
+ } catch (IOException | HoodieIOException e) {
+ LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+ return false;
+ }
+ }
+
+ @Override
+ public void unlock() {
+ synchronized (LOCK_FILE_NAME) {
+ try {
+ if (fs.exists(this.lockFile)) {
+ fs.delete(this.lockFile, true);
+ }
+ } catch (IOException io) {
+ throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
+ }
+ }
+ }
+
+ @Override
+ public String getLock() {
+ return this.lockFile.toString();
+ }
+
+ private boolean checkIfExpired() {
+ if (lockTimeoutMinutes == 0) {
+ return false;
+ }
+ try {
+ long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
+ if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) {
+ return true;
+ }
+ } catch (IOException | HoodieIOException e) {
+ LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e);
+ }
+ return false;
+ }
+
+ private void acquireLock() {
+ try {
+ fs.create(this.lockFile, false).close();
+ } catch (IOException e) {
+ throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+ }
+ }
+
+ protected String generateLogStatement(LockState state) {
+ return StringUtils.join(state.name(), " lock at: ", getLock());
+ }
+
+ private void checkRequiredProps(final LockConfiguration config) {
+ ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null
+ || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null);
+ ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index 9ea28fbbd4..7fcc96810b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -36,6 +36,7 @@ import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUI
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
@@ -106,7 +107,13 @@ public class HoodieLockConfig extends HoodieConfig {
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
.noDefaultValue()
.sinceVersion("0.8.0")
- .withDocumentation("For DFS based lock providers, path to store the locks under.");
+ .withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default");
+
+ public static final ConfigProperty<Integer> FILESYSTEM_LOCK_EXPIRE = ConfigProperty
+ .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
+ .defaultValue(0)
+ .sinceVersion("0.12.0")
+ .withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire");
public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
.key(HIVE_DATABASE_NAME_PROP_KEY)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
new file mode 100644
index 0000000000..208e9cd62e
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+public class TestFileBasedLockProvider {
+ private static HdfsTestService hdfsTestService;
+ private static MiniDFSCluster dfsCluster;
+ private static LockConfiguration lockConfiguration;
+ private static Configuration hadoopConf;
+
+ @BeforeAll
+ public static void setup() throws IOException {
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+ hadoopConf = dfsCluster.getFileSystem().getConf();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
+ properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
+ properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+ properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
+ properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+ lockConfiguration = new LockConfiguration(properties);
+ }
+
+ @AfterAll
+ public static void cleanUpAfterAll() throws IOException {
+ Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
+ FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
+ fs.delete(new Path("/tmp"), true);
+ if (hdfsTestService != null) {
+ hdfsTestService.stop();
+ hdfsTestService = null;
+ }
+ }
+
+ @AfterEach
+ public void cleanUpAfterEach() throws IOException {
+ Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
+ FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
+ fs.delete(new Path("/tmp/lock"), true);
+ }
+
+ @Test
+ public void testAcquireLock() {
+ FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+ Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ fileBasedLockProvider.unlock();
+ }
+
+ @Test
+ public void testAcquireLockWithDefaultPath() {
+ lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
+ lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/");
+ FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+ Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ fileBasedLockProvider.unlock();
+ lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
+ }
+
+ @Test
+ public void testUnLock() {
+ FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+ Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ fileBasedLockProvider.unlock();
+ Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testReentrantLock() {
+ FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+ Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
+ fileBasedLockProvider.unlock();
+ }
+
+ @Test
+ public void testUnlockWithoutLock() {
+ try {
+ FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
+ fileBasedLockProvider.unlock();
+ } catch (HoodieLockException e) {
+ Assertions.fail();
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 268674e78d..6ad8666a0f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -45,9 +46,11 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -66,7 +69,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -76,6 +83,21 @@ import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
+ private Properties lockProperties = null;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ if (lockProperties == null) {
+ lockProperties = new Properties();
+ lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+ lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
+ lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+ lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
+ lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+ }
+ }
+
public void setUpMORTestTable() throws IOException {
cleanupResources();
initPath();
@@ -92,15 +114,27 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
cleanupResources();
}
+ private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
+ InProcessLockProvider.class,
+ FileSystemBasedLockProvider.class);
+
+ private static Iterable<Object[]> providerClassAndTableType() {
+ List<Object[]> opts = new ArrayList<>();
+ for (Object providerClass : LOCK_PROVIDER_CLASSES) {
+ opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass});
+ opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass});
+ }
+ return opts;
+ }
+
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
- public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception {
+ @MethodSource("providerClassAndTableType")
+ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
- Properties properties = new Properties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+
HoodieWriteConfig writeConfig = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
@@ -110,8 +144,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
- .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
- .build()).withAutoCommit(false).withProperties(properties).build();
+ .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
+ .build()).withAutoCommit(false).withProperties(lockProperties).build();
// Create the first commit
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
@@ -172,16 +206,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
assertTrue(writer1Completed.get() && writer2Completed.get());
}
- @Test
- public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
- testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
- }
-
- @Test
- public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception {
- testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
- }
-
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
@@ -189,11 +213,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
setUpMORTestTable();
}
- Properties properties = new Properties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -210,7 +232,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withAutoCommit(false)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
- .withProperties(properties)
+ .withProperties(lockProperties)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
.build();
@@ -260,15 +282,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
}
}
- private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
+ @ParameterizedTest
+ @MethodSource("providerClassAndTableType")
+ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
- Properties properties = new Properties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -284,8 +304,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
FileSystemViewStorageType.MEMORY).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
- .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
- .build()).withAutoCommit(false).withProperties(properties);
+ .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
+ .build()).withAutoCommit(false).withProperties(lockProperties);
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
@@ -458,10 +478,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
@Test
public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
- Properties properties = new Properties();
- properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
+ lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
@@ -470,7 +487,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
- .build()).withAutoCommit(true).withProperties(properties);
+ .build()).withAutoCommit(true).withProperties(lockProperties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 0109f22097..c6ebc54e95 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -48,6 +48,8 @@ public class LockConfiguration implements Serializable {
public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
+ public static final String FILESYSTEM_LOCK_EXPIRE_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "expire";
+
// configs for metastore based locks
public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";