You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/10/04 03:02:49 UTC

[hadoop] branch trunk updated: HDDS-2223. Support ReadWrite lock in LockManager. (#1564)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9700e20  HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
9700e20 is described below

commit 9700e2003aa1b7e2c4072a2a08d8827acc5aa779
Author: Nanda kumar <na...@apache.org>
AuthorDate: Fri Oct 4 08:32:43 2019 +0530

    HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
---
 .../org/apache/hadoop/ozone/lock/ActiveLock.java   |  63 ++++++--
 .../org/apache/hadoop/ozone/lock/LockManager.java  | 166 ++++++++++++++++++---
 .../apache/hadoop/ozone/lock/TestLockManager.java  | 145 +++++++++++++++---
 3 files changed, 323 insertions(+), 51 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
index c302084..49efad0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
@@ -18,22 +18,22 @@
 package org.apache.hadoop.ozone.lock;
 
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Lock implementation which also maintains counter.
  */
 public final class ActiveLock {
 
-  private Lock lock;
+  private ReadWriteLock lock;
   private AtomicInteger count;
 
   /**
    * Use ActiveLock#newInstance to create instance.
    */
   private ActiveLock() {
-    this.lock = new ReentrantLock();
+    this.lock = new ReentrantReadWriteLock();
     this.count = new AtomicInteger(0);
   }
 
@@ -47,21 +47,58 @@ public final class ActiveLock {
   }
 
   /**
-   * Acquires the lock.
+   * Acquires read lock.
    *
-   * <p>If the lock is not available then the current thread becomes
-   * disabled for thread scheduling purposes and lies dormant until the
-   * lock has been acquired.
+   * <p>Acquires the read lock if the write lock is not held by
+   * another thread and returns immediately.
+   *
+   * <p>If the write lock is held by another thread then
+   * the current thread becomes disabled for thread scheduling
+   * purposes and lies dormant until the read lock has been acquired.
+   */
+  void readLock() {
+    lock.readLock().lock();
+  }
+
+  /**
+   * Attempts to release the read lock.
+   *
+   * <p>If the number of readers is now zero then the lock
+   * is made available for write lock attempts.
+   */
+  void readUnlock() {
+    lock.readLock().unlock();
+  }
+
+  /**
+   * Acquires write lock.
+   *
+   * <p>Acquires the write lock if neither the read nor write lock
+   * are held by another thread
+   * and returns immediately, setting the write lock hold count to
+   * one.
+   *
+   * <p>If the current thread already holds the write lock then the
+   * hold count is incremented by one and the method returns
+   * immediately.
+   *
+   * <p>If the lock is held by another thread then the current
+   * thread becomes disabled for thread scheduling purposes and
+   * lies dormant until the write lock has been acquired.
    */
-  public void lock() {
-    lock.lock();
+  void writeLock() {
+    lock.writeLock().lock();
   }
 
   /**
-   * Releases the lock.
+   * Attempts to release the write lock.
+   *
+   * <p>If the current thread is the holder of this lock then
+   * the hold count is decremented. If the hold count is now
+   * zero then the lock is released.
    */
-  public void unlock() {
-    lock.unlock();
+  void writeUnlock() {
+    lock.writeLock().unlock();
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
index 5f76bd6..670d4d1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
@@ -25,42 +25,156 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 /**
  * Manages the locks on a given resource. A new lock is created for each
  * and every unique resource. Uniqueness of resource depends on the
  * {@code equals} implementation of it.
  */
-public class LockManager<T> {
+public class LockManager<R> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
 
-  private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
+  private final Map<R, ActiveLock> activeLocks = new ConcurrentHashMap<>();
   private final GenericObjectPool<ActiveLock> lockPool =
       new GenericObjectPool<>(new PooledLockFactory());
 
   /**
-   * Creates new LockManager instance.
+   * Creates new LockManager instance with the given Configuration.
    *
    * @param conf Configuration object
    */
-  public LockManager(Configuration conf) {
-    int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
+  public LockManager(final Configuration conf) {
+    final int maxPoolSize = conf.getInt(
+        HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
         HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
     lockPool.setMaxTotal(maxPoolSize);
   }
 
-
   /**
    * Acquires the lock on given resource.
    *
    * <p>If the lock is not available then the current thread becomes
    * disabled for thread scheduling purposes and lies dormant until the
    * lock has been acquired.
+   *
+   * @param resource on which the lock has to be acquired
+   * @deprecated Use {@link LockManager#writeLock} instead
+   */
+  public void lock(final R resource) {
+    writeLock(resource);
+  }
+
+  /**
+   * Releases the lock on given resource.
+   *
+   * @param resource for which the lock has to be released
+   * @deprecated Use {@link LockManager#writeUnlock} instead
+   */
+  public void unlock(final R resource) {
+    writeUnlock(resource);
+  }
+
+  /**
+   * Acquires the read lock on given resource.
+   *
+   * <p>Acquires the read lock on resource if the write lock is not held by
+   * another thread and returns immediately.
+   *
+   * <p>If the write lock on resource is held by another thread then
+   * the current thread becomes disabled for thread scheduling
+   * purposes and lies dormant until the read lock has been acquired.
+   *
+   * @param resource on which the read lock has to be acquired
+   */
+  public void readLock(final R resource) {
+    acquire(resource, ActiveLock::readLock);
+  }
+
+  /**
+   * Releases the read lock on given resource.
+   *
+   * @param resource for which the read lock has to be released
+   * @throws IllegalMonitorStateException if the current thread does not
+   *                                      hold this lock
+   */
+  public void readUnlock(final R resource) throws IllegalMonitorStateException {
+    release(resource, ActiveLock::readUnlock);
+  }
+
+  /**
+   * Acquires the write lock on given resource.
+   *
+   * <p>Acquires the write lock on resource if neither the read nor write lock
+   * are held by another thread and returns immediately.
+   *
+   * <p>If the current thread already holds the write lock then the
+   * hold count is incremented by one and the method returns
+   * immediately.
+   *
+   * <p>If the lock is held by another thread then the current
+   * thread becomes disabled for thread scheduling purposes and
+   * lies dormant until the write lock has been acquired.
+   *
+   * @param resource on which the lock has to be acquired
    */
-  public void lock(T resource) {
-    activeLocks.compute(resource, (k, v) -> {
-      ActiveLock lock;
+  public void writeLock(final R resource) {
+    acquire(resource, ActiveLock::writeLock);
+  }
+
+  /**
+   * Releases the write lock on given resource.
+   *
+   * @param resource for which the lock has to be released
+   * @throws IllegalMonitorStateException if the current thread does not
+   *                                      hold this lock
+   */
+  public void writeUnlock(final R resource)
+      throws IllegalMonitorStateException {
+    release(resource, ActiveLock::writeUnlock);
+  }
+
+  /**
+   * Acquires the lock on given resource using the provided lock function.
+   *
+   * @param resource on which the lock has to be acquired
+   * @param lockFn function to acquire the lock
+   */
+  private void acquire(final R resource, final Consumer<ActiveLock> lockFn) {
+    lockFn.accept(getLockForLocking(resource));
+  }
+
+  /**
+   * Releases the lock on given resource using the provided release function.
+   *
+   * @param resource for which the lock has to be released
+   * @param releaseFn function to release the lock
+   */
+  private void release(final R resource, final Consumer<ActiveLock> releaseFn) {
+    final ActiveLock lock = getLockForReleasing(resource);
+    releaseFn.accept(lock);
+    decrementActiveLockCount(resource);
+  }
+
+  /**
+   * Returns {@link ActiveLock} instance for the given resource,
+   * on which the lock can be acquired.
+   *
+   * @param resource on which the lock has to be acquired
+   * @return {@link ActiveLock} instance
+   */
+  private ActiveLock getLockForLocking(final R resource) {
+    /*
+     * While getting a lock object for locking we should
+     * atomically increment the active count of the lock.
+     *
+     * This is to avoid cases where the selected lock could
+     * be removed from the activeLocks map and returned to
+     * the object pool.
+     */
+    return activeLocks.compute(resource, (k, v) -> {
+      final ActiveLock lock;
       try {
         if (v == null) {
           lock = lockPool.borrowObject();
@@ -73,22 +187,34 @@ public class LockManager<T> {
         throw new RuntimeException(ex);
       }
       return lock;
-    }).lock();
+    });
   }
 
   /**
-   * Releases the lock on given resource.
+   * Returns {@link ActiveLock} instance for the given resource,
+   * for which the lock has to be released.
+   *
+   * @param resource for which the lock has to be released
+   * @return {@link ActiveLock} instance
    */
-  public void unlock(T resource) {
-    ActiveLock lock = activeLocks.get(resource);
-    if (lock == null) {
-      // Someone is releasing a lock which was never acquired. Log and return.
-      LOG.error("Trying to release the lock on {}, which was never acquired.",
-          resource);
-      throw new IllegalMonitorStateException("Releasing lock on resource "
-          + resource + " without acquiring lock");
+  private ActiveLock getLockForReleasing(final R resource) {
+    if (activeLocks.containsKey(resource)) {
+      return activeLocks.get(resource);
     }
-    lock.unlock();
+    // Someone is releasing a lock which was never acquired.
+    LOG.error("Trying to release the lock on {}, which was never acquired.",
+        resource);
+    throw new IllegalMonitorStateException("Releasing lock on resource "
+        + resource + " without acquiring lock");
+  }
+
+  /**
+   * Decrements the active lock count and returns the {@link ActiveLock}
+   * object to pool if the active count is 0.
+   *
+   * @param resource resource to which the ActiveLock is associated
+   */
+  private void decrementActiveLockCount(final R resource) {
     activeLocks.computeIfPresent(resource, (k, v) -> {
       v.decrementActiveCount();
       if (v.getActiveLockCount() != 0) {
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
index fa3030d..e88b1bb 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
@@ -29,34 +29,143 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class TestLockManager {
 
   @Test(timeout = 1000)
-  public void testWithDifferentResource() {
-    LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
-    manager.lock("/resourceOne");
+  public void testWriteLockWithDifferentResource() {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    manager.writeLock("/resourceOne");
     // This should work, as they are different resource.
-    manager.lock("/resourceTwo");
-    manager.unlock("/resourceOne");
-    manager.unlock("/resourceTwo");
+    manager.writeLock("/resourceTwo");
+    manager.writeUnlock("/resourceOne");
+    manager.writeUnlock("/resourceTwo");
     Assert.assertTrue(true);
   }
 
   @Test
-  public void testWithSameResource() throws Exception {
-    LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
-    manager.lock("/resourceOne");
-    AtomicBoolean gotLock = new AtomicBoolean(false);
+  public void testWriteLockWithSameResource() throws Exception {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    final AtomicBoolean gotLock = new AtomicBoolean(false);
+    manager.writeLock("/resourceOne");
     new Thread(() -> {
-      manager.lock("/resourceOne");
+      manager.writeLock("/resourceOne");
       gotLock.set(true);
-      manager.unlock("/resourceOne");
+      manager.writeUnlock("/resourceOne");
     }).start();
-    // Let's give some time for the new thread to run
+    // Let's give some time for the other thread to run
     Thread.sleep(100);
-    // Since the new thread is trying to get lock on same object, it will wait.
+    // Since the other thread is trying to get write lock on same object,
+    // it will wait.
     Assert.assertFalse(gotLock.get());
-    manager.unlock("/resourceOne");
-    // Since we have released the lock, the new thread should have the lock
-    // now
-    // Let's give some time for the new thread to run
+    manager.writeUnlock("/resourceOne");
+    // Since we have released the write lock, the other thread should have
+    // the lock now
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test(timeout = 1000)
+  public void testReadLockWithDifferentResource() {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    manager.readLock("/resourceOne");
+    manager.readLock("/resourceTwo");
+    manager.readUnlock("/resourceOne");
+    manager.readUnlock("/resourceTwo");
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testReadLockWithSameResource() throws Exception {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    final AtomicBoolean gotLock = new AtomicBoolean(false);
+    manager.readLock("/resourceOne");
+    new Thread(() -> {
+      manager.readLock("/resourceOne");
+      gotLock.set(true);
+      manager.readUnlock("/resourceOne");
+    }).start();
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    // Since the new thread is trying to get read lock, it should work.
+    Assert.assertTrue(gotLock.get());
+    manager.readUnlock("/resourceOne");
+  }
+
+  @Test
+  public void testWriteReadLockWithSameResource() throws Exception {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    final AtomicBoolean gotLock = new AtomicBoolean(false);
+    manager.writeLock("/resourceOne");
+    new Thread(() -> {
+      manager.readLock("/resourceOne");
+      gotLock.set(true);
+      manager.readUnlock("/resourceOne");
+    }).start();
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    // Since the other thread is trying to get read lock on same object,
+    // it will wait.
+    Assert.assertFalse(gotLock.get());
+    manager.writeUnlock("/resourceOne");
+    // Since we have released the write lock, the other thread should have
+    // the lock now
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test
+  public void testReadWriteLockWithSameResource() throws Exception {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    final AtomicBoolean gotLock = new AtomicBoolean(false);
+    manager.readLock("/resourceOne");
+    new Thread(() -> {
+      manager.writeLock("/resourceOne");
+      gotLock.set(true);
+      manager.writeUnlock("/resourceOne");
+    }).start();
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    // Since the other thread is trying to get write lock on same object,
+    // it will wait.
+    Assert.assertFalse(gotLock.get());
+    manager.readUnlock("/resourceOne");
+    // Since we have released the read lock, the other thread should have
+    // the lock now
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    Assert.assertTrue(gotLock.get());
+  }
+
+  @Test
+  public void testMultiReadWriteLockWithSameResource() throws Exception {
+    final LockManager<String> manager =
+        new LockManager<>(new OzoneConfiguration());
+    final AtomicBoolean gotLock = new AtomicBoolean(false);
+    manager.readLock("/resourceOne");
+    manager.readLock("/resourceOne");
+    new Thread(() -> {
+      manager.writeLock("/resourceOne");
+      gotLock.set(true);
+      manager.writeUnlock("/resourceOne");
+    }).start();
+    // Let's give some time for the other thread to run
+    Thread.sleep(100);
+    // Since the other thread is trying to get write lock on same object,
+    // it will wait.
+    Assert.assertFalse(gotLock.get());
+    manager.readUnlock("/resourceOne");
+    //We have only released one read lock, we still hold another read lock.
+    Thread.sleep(100);
+    Assert.assertFalse(gotLock.get());
+    manager.readUnlock("/resourceOne");
+    // Since we have released the read lock, the other thread should have
+    // the lock now
+    // Let's give some time for the other thread to run
     Thread.sleep(100);
     Assert.assertTrue(gotLock.get());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org