You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/11 23:26:53 UTC
[hudi] branch master updated: [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a764c2aee3 [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)
a764c2aee3 is described below
commit a764c2aee387acf4bb5ccaa49ace01ffd2ec10e8
Author: Rajesh Mahindra <76...@users.noreply.github.com>
AuthorDate: Tue Oct 11 16:26:44 2022 -0700
[HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)
Co-authored-by: rmahindra123 <rm...@Rajeshs-MacBook-Pro.local>
---
.../transaction/lock/InProcessLockProvider.java | 36 +++---
.../transaction/TestInProcessLockProvider.java | 124 ++++++++++++++++++---
2 files changed, 134 insertions(+), 26 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
index c3cd574248..2a60473c82 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -23,7 +23,8 @@ import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hadoop.conf.Configuration;
@@ -32,12 +33,15 @@ import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* InProcess level lock. This {@link LockProvider} implementation is to
* guard table from concurrent operations happening in the local JVM process.
+ * A separate lock is maintained per "table basepath".
* <p>
* Note: This Lock provider implementation doesn't allow lock reentrancy.
* Attempting to reacquire the lock from the same thread will throw
@@ -47,11 +51,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable {
private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class);
- private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
+ private static final Map<String, ReentrantReadWriteLock> LOCK_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock;
+ private final String basePath;
private final long maxWaitTimeMillis;
public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
TypedProperties typedProperties = lockConfiguration.getConfig();
+ basePath = lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key());
+ ValidationUtils.checkArgument(basePath != null);
+ lock = LOCK_INSTANCE_PER_BASEPATH.computeIfAbsent(basePath, (ignore) -> new ReentrantReadWriteLock());
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
}
@@ -59,10 +68,10 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
@Override
public void lock() {
LOG.info(getLogMessage(LockState.ACQUIRING));
- if (LOCK.isWriteLockedByCurrentThread()) {
+ if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
- LOCK.writeLock().lock();
+ lock.writeLock().lock();
LOG.info(getLogMessage(LockState.ACQUIRED));
}
@@ -74,13 +83,13 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) {
LOG.info(getLogMessage(LockState.ACQUIRING));
- if (LOCK.isWriteLockedByCurrentThread()) {
+ if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
boolean isLockAcquired;
try {
- isLockAcquired = LOCK.writeLock().tryLock(time, unit);
+ isLockAcquired = lock.writeLock().tryLock(time, unit);
} catch (InterruptedException e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
}
@@ -93,8 +102,8 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
public void unlock() {
LOG.info(getLogMessage(LockState.RELEASING));
try {
- if (LOCK.isWriteLockedByCurrentThread()) {
- LOCK.writeLock().unlock();
+ if (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
@@ -106,18 +115,19 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
@Override
public ReentrantReadWriteLock getLock() {
- return LOCK;
+ return lock;
}
@Override
public void close() {
- if (LOCK.isWriteLockedByCurrentThread()) {
- LOCK.writeLock().unlock();
+ if (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
}
+ LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
+ LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}
private String getLogMessage(LockState state) {
- return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
- state.name(), " in-process lock.");
+ return String.format("Base Path %s, Lock Instance %s, Thread %s, In-process lock state %s", basePath, getLock().toString(), Thread.currentThread().getName(), state.name());
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
index 6d6c526785..99ab0887e7 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -39,11 +40,20 @@ public class TestInProcessLockProvider {
private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
private final Configuration hadoopConfiguration = new Configuration();
- private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
+ private final LockConfiguration lockConfiguration1;
+ private final LockConfiguration lockConfiguration2;
+
+ public TestInProcessLockProvider() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(HoodieWriteConfig.BASE_PATH.key(), "table1");
+ lockConfiguration1 = new LockConfiguration(properties);
+ properties.put(HoodieWriteConfig.BASE_PATH.key(), "table2");
+ lockConfiguration2 = new LockConfiguration(properties);
+ }
@Test
public void testLockAcquisition() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -54,7 +64,7 @@ public class TestInProcessLockProvider {
@Test
public void testLockReAcquisitionBySameThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -66,9 +76,34 @@ public class TestInProcessLockProvider {
});
}
+ @Test
+ public void testLockReAcquisitionBySameThreadWithTwoTables() {
+ InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider2.lock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+ }
+
@Test
public void testLockReAcquisitionByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
// Main test thread
@@ -104,9 +139,72 @@ public class TestInProcessLockProvider {
Assertions.assertTrue(writer2Completed.get());
}
+ @Test
+ public void testLockReAcquisitionByDifferentThreadWithTwoTables() {
+ InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+ final AtomicBoolean writer2Stream1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Stream2Completed = new AtomicBoolean(false);
+
+ // Main test thread
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+
+ // Another writer thread in parallel, should block
+ // and later acquire the lock once it is released
+ Thread writer2Stream1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ writer2Stream1Completed.set(true);
+ }
+ });
+ Thread writer2Stream2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+ writer2Stream2Completed.set(true);
+ }
+ });
+
+ writer2Stream1.start();
+ writer2Stream2.start();
+
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+
+ try {
+ writer2Stream1.join();
+ writer2Stream2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+ Assertions.assertTrue(writer2Stream1Completed.get());
+ Assertions.assertTrue(writer2Stream2Completed.get());
+ }
+
@Test
public void testTryLockAcquisition() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
@@ -115,7 +213,7 @@ public class TestInProcessLockProvider {
@Test
public void testTryLockAcquisitionWithTimeout() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
@@ -124,7 +222,7 @@ public class TestInProcessLockProvider {
@Test
public void testTryLockReAcquisitionBySameThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
@@ -136,7 +234,7 @@ public class TestInProcessLockProvider {
@Test
public void testTryLockReAcquisitionByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
// Main test thread
@@ -162,7 +260,7 @@ public class TestInProcessLockProvider {
@Test
public void testTryUnLockByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer3Completed = new AtomicBoolean(false);
// Main test thread
@@ -203,7 +301,7 @@ public class TestInProcessLockProvider {
@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
- final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final int threadCount = 3;
final long awaitMaxTimeoutMs = 2000L;
final CountDownLatch latch = new CountDownLatch(threadCount);
@@ -261,7 +359,7 @@ public class TestInProcessLockProvider {
@Test
public void testLockReleaseByClose() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -272,7 +370,7 @@ public class TestInProcessLockProvider {
@Test
public void testRedundantUnlock() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -286,7 +384,7 @@ public class TestInProcessLockProvider {
@Test
public void testUnlockWithoutLock() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});