You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/11 23:26:53 UTC

[hudi] branch master updated: [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a764c2aee3 [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)
a764c2aee3 is described below

commit a764c2aee387acf4bb5ccaa49ace01ffd2ec10e8
Author: Rajesh Mahindra <76...@users.noreply.github.com>
AuthorDate: Tue Oct 11 16:26:44 2022 -0700

    [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (#6847)
    
    
    Co-authored-by: rmahindra123 <rm...@Rajeshs-MacBook-Pro.local>
---
 .../transaction/lock/InProcessLockProvider.java    |  36 +++---
 .../transaction/TestInProcessLockProvider.java     | 124 ++++++++++++++++++---
 2 files changed, 134 insertions(+), 26 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
index c3cd574248..2a60473c82 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -23,7 +23,8 @@ import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieLockException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,12 +33,15 @@ import org.apache.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
 
 import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * InProcess level lock. This {@link LockProvider} implementation is to
  * guard table from concurrent operations happening in the local JVM process.
+ * A separate lock is maintained per "table basepath".
  * <p>
  * Note: This Lock provider implementation doesn't allow lock reentrancy.
  * Attempting to reacquire the lock from the same thread will throw
@@ -47,11 +51,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable {
 
   private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class);
-  private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
+  private static final Map<String, ReentrantReadWriteLock> LOCK_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
+  private final ReentrantReadWriteLock lock;
+  private final String basePath;
   private final long maxWaitTimeMillis;
 
   public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
     TypedProperties typedProperties = lockConfiguration.getConfig();
+    basePath = lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key());
+    ValidationUtils.checkArgument(basePath != null);
+    lock = LOCK_INSTANCE_PER_BASEPATH.computeIfAbsent(basePath, (ignore) -> new ReentrantReadWriteLock());
     maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
         LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
   }
@@ -59,10 +68,10 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
   @Override
   public void lock() {
     LOG.info(getLogMessage(LockState.ACQUIRING));
-    if (LOCK.isWriteLockedByCurrentThread()) {
+    if (lock.isWriteLockedByCurrentThread()) {
       throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
     }
-    LOCK.writeLock().lock();
+    lock.writeLock().lock();
     LOG.info(getLogMessage(LockState.ACQUIRED));
   }
 
@@ -74,13 +83,13 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
   @Override
   public boolean tryLock(long time, @NotNull TimeUnit unit) {
     LOG.info(getLogMessage(LockState.ACQUIRING));
-    if (LOCK.isWriteLockedByCurrentThread()) {
+    if (lock.isWriteLockedByCurrentThread()) {
       throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
     }
 
     boolean isLockAcquired;
     try {
-      isLockAcquired = LOCK.writeLock().tryLock(time, unit);
+      isLockAcquired = lock.writeLock().tryLock(time, unit);
     } catch (InterruptedException e) {
       throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
     }
@@ -93,8 +102,8 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
   public void unlock() {
     LOG.info(getLogMessage(LockState.RELEASING));
     try {
-      if (LOCK.isWriteLockedByCurrentThread()) {
-        LOCK.writeLock().unlock();
+      if (lock.isWriteLockedByCurrentThread()) {
+        lock.writeLock().unlock();
         LOG.info(getLogMessage(LockState.RELEASED));
       } else {
         LOG.warn("Cannot unlock because the current thread does not hold the lock.");
@@ -106,18 +115,19 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
 
   @Override
   public ReentrantReadWriteLock getLock() {
-    return LOCK;
+    return lock;
   }
 
   @Override
   public void close() {
-    if (LOCK.isWriteLockedByCurrentThread()) {
-      LOCK.writeLock().unlock();
+    if (lock.isWriteLockedByCurrentThread()) {
+      lock.writeLock().unlock();
     }
+    LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
+    LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
   }
 
   private String getLogMessage(LockState state) {
-    return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
-        state.name(), " in-process lock.");
+    return String.format("Base Path %s, Lock Instance %s, Thread %s, In-process lock state %s", basePath, getLock().toString(), Thread.currentThread().getName(), state.name());
   }
 }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
index 6d6c526785..99ab0887e7 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieLockException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -39,11 +40,20 @@ public class TestInProcessLockProvider {
 
   private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
   private final Configuration hadoopConfiguration = new Configuration();
-  private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
+  private final LockConfiguration lockConfiguration1;
+  private final LockConfiguration lockConfiguration2;
+
+  public TestInProcessLockProvider() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(HoodieWriteConfig.BASE_PATH.key(), "table1");
+    lockConfiguration1 = new LockConfiguration(properties);
+    properties.put(HoodieWriteConfig.BASE_PATH.key(), "table2");
+    lockConfiguration2 = new LockConfiguration(properties);
+  }
 
   @Test
   public void testLockAcquisition() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     assertDoesNotThrow(() -> {
       inProcessLockProvider.lock();
     });
@@ -54,7 +64,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testLockReAcquisitionBySameThread() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     assertDoesNotThrow(() -> {
       inProcessLockProvider.lock();
     });
@@ -66,9 +76,34 @@ public class TestInProcessLockProvider {
     });
   }
 
+  @Test
+  public void testLockReAcquisitionBySameThreadWithTwoTables() {
+    InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider2.lock();
+    });
+    assertThrows(HoodieLockException.class, () -> {
+      inProcessLockProvider2.lock();
+    });
+    assertThrows(HoodieLockException.class, () -> {
+      inProcessLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider1.unlock();
+    });
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider2.unlock();
+    });
+  }
+
   @Test
   public void testLockReAcquisitionByDifferentThread() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     final AtomicBoolean writer2Completed = new AtomicBoolean(false);
 
     // Main test thread
@@ -104,9 +139,72 @@ public class TestInProcessLockProvider {
     Assertions.assertTrue(writer2Completed.get());
   }
 
+  @Test
+  public void testLockReAcquisitionByDifferentThreadWithTwoTables() {
+    InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+    final AtomicBoolean writer2Stream1Completed = new AtomicBoolean(false);
+    final AtomicBoolean writer2Stream2Completed = new AtomicBoolean(false);
+
+    // Main test thread
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider1.lock();
+    });
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider2.lock();
+    });
+
+    // Another writer thread in parallel, should block
+    // and later acquire the lock once it is released
+    Thread writer2Stream1 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        assertDoesNotThrow(() -> {
+          inProcessLockProvider1.lock();
+        });
+        assertDoesNotThrow(() -> {
+          inProcessLockProvider1.unlock();
+        });
+        writer2Stream1Completed.set(true);
+      }
+    });
+    Thread writer2Stream2 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        assertDoesNotThrow(() -> {
+          inProcessLockProvider2.lock();
+        });
+        assertDoesNotThrow(() -> {
+          inProcessLockProvider2.unlock();
+        });
+        writer2Stream2Completed.set(true);
+      }
+    });
+
+    writer2Stream1.start();
+    writer2Stream2.start();
+
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider1.unlock();
+    });
+    assertDoesNotThrow(() -> {
+      inProcessLockProvider2.unlock();
+    });
+
+    try {
+      writer2Stream1.join();
+      writer2Stream2.join();
+    } catch (InterruptedException e) {
+      //
+    }
+    Assertions.assertTrue(writer2Stream1Completed.get());
+    Assertions.assertTrue(writer2Stream2Completed.get());
+  }
+
   @Test
   public void testTryLockAcquisition() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     Assertions.assertTrue(inProcessLockProvider.tryLock());
     assertDoesNotThrow(() -> {
       inProcessLockProvider.unlock();
@@ -115,7 +213,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testTryLockAcquisitionWithTimeout() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
     assertDoesNotThrow(() -> {
       inProcessLockProvider.unlock();
@@ -124,7 +222,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testTryLockReAcquisitionBySameThread() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     Assertions.assertTrue(inProcessLockProvider.tryLock());
     assertThrows(HoodieLockException.class, () -> {
       inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
@@ -136,7 +234,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testTryLockReAcquisitionByDifferentThread() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     final AtomicBoolean writer2Completed = new AtomicBoolean(false);
 
     // Main test thread
@@ -162,7 +260,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testTryUnLockByDifferentThread() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     final AtomicBoolean writer3Completed = new AtomicBoolean(false);
 
     // Main test thread
@@ -203,7 +301,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
-    final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     final int threadCount = 3;
     final long awaitMaxTimeoutMs = 2000L;
     final CountDownLatch latch = new CountDownLatch(threadCount);
@@ -261,7 +359,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testLockReleaseByClose() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     assertDoesNotThrow(() -> {
       inProcessLockProvider.lock();
     });
@@ -272,7 +370,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testRedundantUnlock() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     assertDoesNotThrow(() -> {
       inProcessLockProvider.lock();
     });
@@ -286,7 +384,7 @@ public class TestInProcessLockProvider {
 
   @Test
   public void testUnlockWithoutLock() {
-    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+    InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
     assertDoesNotThrow(() -> {
       inProcessLockProvider.unlock();
     });