You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/10/04 03:02:49 UTC
[hadoop] branch trunk updated: HDDS-2223. Support ReadWrite lock in
LockManager. (#1564)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9700e20 HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
9700e20 is described below
commit 9700e2003aa1b7e2c4072a2a08d8827acc5aa779
Author: Nanda kumar <na...@apache.org>
AuthorDate: Fri Oct 4 08:32:43 2019 +0530
HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
---
.../org/apache/hadoop/ozone/lock/ActiveLock.java | 63 ++++++--
.../org/apache/hadoop/ozone/lock/LockManager.java | 166 ++++++++++++++++++---
.../apache/hadoop/ozone/lock/TestLockManager.java | 145 +++++++++++++++---
3 files changed, 323 insertions(+), 51 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
index c302084..49efad0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
@@ -18,22 +18,22 @@
package org.apache.hadoop.ozone.lock;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Lock implementation which also maintains counter.
*/
public final class ActiveLock {
- private Lock lock;
+ private ReadWriteLock lock;
private AtomicInteger count;
/**
* Use ActiveLock#newInstance to create instance.
*/
private ActiveLock() {
- this.lock = new ReentrantLock();
+ this.lock = new ReentrantReadWriteLock();
this.count = new AtomicInteger(0);
}
@@ -47,21 +47,58 @@ public final class ActiveLock {
}
/**
- * Acquires the lock.
+ * Acquires read lock.
*
- * <p>If the lock is not available then the current thread becomes
- * disabled for thread scheduling purposes and lies dormant until the
- * lock has been acquired.
+ * <p>Acquires the read lock if the write lock is not held by
+ * another thread and returns immediately.
+ *
+ * <p>If the write lock is held by another thread then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until the read lock has been acquired.
+ */
+ void readLock() {
+ lock.readLock().lock();
+ }
+
+ /**
+ * Attempts to release the read lock.
+ *
+ * <p>If the number of readers is now zero then the lock
+ * is made available for write lock attempts.
+ */
+ void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ /**
+ * Acquires write lock.
+ *
+ * <p>Acquires the write lock if neither the read nor write lock
+ * are held by another thread
+ * and returns immediately, setting the write lock hold count to
+ * one.
+ *
+ * <p>If the current thread already holds the write lock then the
+ * hold count is incremented by one and the method returns
+ * immediately.
+ *
+ * <p>If the lock is held by another thread then the current
+ * thread becomes disabled for thread scheduling purposes and
+ * lies dormant until the write lock has been acquired.
*/
- public void lock() {
- lock.lock();
+ void writeLock() {
+ lock.writeLock().lock();
}
/**
- * Releases the lock.
+ * Attempts to release the write lock.
+ *
+ * <p>If the current thread is the holder of this lock then
+ * the hold count is decremented. If the hold count is now
+ * zero then the lock is released.
*/
- public void unlock() {
- lock.unlock();
+ void writeUnlock() {
+ lock.writeLock().unlock();
}
/**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
index 5f76bd6..670d4d1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
@@ -25,42 +25,156 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
/**
* Manages the locks on a given resource. A new lock is created for each
* and every unique resource. Uniqueness of resource depends on the
* {@code equals} implementation of it.
*/
-public class LockManager<T> {
+public class LockManager<R> {
private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
- private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
+ private final Map<R, ActiveLock> activeLocks = new ConcurrentHashMap<>();
private final GenericObjectPool<ActiveLock> lockPool =
new GenericObjectPool<>(new PooledLockFactory());
/**
- * Creates new LockManager instance.
+ * Creates new LockManager instance with the given Configuration.
*
* @param conf Configuration object
*/
- public LockManager(Configuration conf) {
- int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
+ public LockManager(final Configuration conf) {
+ final int maxPoolSize = conf.getInt(
+ HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
lockPool.setMaxTotal(maxPoolSize);
}
-
/**
* Acquires the lock on given resource.
*
* <p>If the lock is not available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until the
* lock has been acquired.
+ *
+ * @param resource on which the lock has to be acquired
+ * @deprecated Use {@link LockManager#writeLock} instead
+ */
+ public void lock(final R resource) {
+ writeLock(resource);
+ }
+
+ /**
+ * Releases the lock on given resource.
+ *
+ * @param resource for which the lock has to be released
+ * @deprecated Use {@link LockManager#writeUnlock} instead
+ */
+ public void unlock(final R resource) {
+ writeUnlock(resource);
+ }
+
+ /**
+ * Acquires the read lock on given resource.
+ *
+ * <p>Acquires the read lock on resource if the write lock is not held by
+ * another thread and returns immediately.
+ *
+ * <p>If the write lock on resource is held by another thread then
+ * the current thread becomes disabled for thread scheduling
+ * purposes and lies dormant until the read lock has been acquired.
+ *
+ * @param resource on which the read lock has to be acquired
+ */
+ public void readLock(final R resource) {
+ acquire(resource, ActiveLock::readLock);
+ }
+
+ /**
+ * Releases the read lock on given resource.
+ *
+ * @param resource for which the read lock has to be released
+ * @throws IllegalMonitorStateException if the current thread does not
+ * hold this lock
+ */
+ public void readUnlock(final R resource) throws IllegalMonitorStateException {
+ release(resource, ActiveLock::readUnlock);
+ }
+
+ /**
+ * Acquires the write lock on given resource.
+ *
+ * <p>Acquires the write lock on resource if neither the read nor write lock
+ * are held by another thread and returns immediately.
+ *
+ * <p>If the current thread already holds the write lock then the
+ * hold count is incremented by one and the method returns
+ * immediately.
+ *
+ * <p>If the lock is held by another thread then the current
+ * thread becomes disabled for thread scheduling purposes and
+ * lies dormant until the write lock has been acquired.
+ *
+ * @param resource on which the lock has to be acquired
*/
- public void lock(T resource) {
- activeLocks.compute(resource, (k, v) -> {
- ActiveLock lock;
+ public void writeLock(final R resource) {
+ acquire(resource, ActiveLock::writeLock);
+ }
+
+ /**
+ * Releases the write lock on given resource.
+ *
+ * @param resource for which the lock has to be released
+ * @throws IllegalMonitorStateException if the current thread does not
+ * hold this lock
+ */
+ public void writeUnlock(final R resource)
+ throws IllegalMonitorStateException {
+ release(resource, ActiveLock::writeUnlock);
+ }
+
+ /**
+ * Acquires the lock on given resource using the provided lock function.
+ *
+ * @param resource on which the lock has to be acquired
+ * @param lockFn function to acquire the lock
+ */
+ private void acquire(final R resource, final Consumer<ActiveLock> lockFn) {
+ lockFn.accept(getLockForLocking(resource));
+ }
+
+ /**
+ * Releases the lock on given resource using the provided release function.
+ *
+ * @param resource for which the lock has to be released
+ * @param releaseFn function to release the lock
+ */
+ private void release(final R resource, final Consumer<ActiveLock> releaseFn) {
+ final ActiveLock lock = getLockForReleasing(resource);
+ releaseFn.accept(lock);
+ decrementActiveLockCount(resource);
+ }
+
+ /**
+ * Returns {@link ActiveLock} instance for the given resource,
+ * on which the lock can be acquired.
+ *
+ * @param resource on which the lock has to be acquired
+ * @return {@link ActiveLock} instance
+ */
+ private ActiveLock getLockForLocking(final R resource) {
+ /*
+ * While getting a lock object for locking we should
+ * atomically increment the active count of the lock.
+ *
+ * This is to avoid cases where the selected lock could
+ * be removed from the activeLocks map and returned to
+ * the object pool.
+ */
+ return activeLocks.compute(resource, (k, v) -> {
+ final ActiveLock lock;
try {
if (v == null) {
lock = lockPool.borrowObject();
@@ -73,22 +187,34 @@ public class LockManager<T> {
throw new RuntimeException(ex);
}
return lock;
- }).lock();
+ });
}
/**
- * Releases the lock on given resource.
+ * Returns {@link ActiveLock} instance for the given resource,
+ * for which the lock has to be released.
+ *
+ * @param resource for which the lock has to be released
+ * @return {@link ActiveLock} instance
*/
- public void unlock(T resource) {
- ActiveLock lock = activeLocks.get(resource);
- if (lock == null) {
- // Someone is releasing a lock which was never acquired. Log and return.
- LOG.error("Trying to release the lock on {}, which was never acquired.",
- resource);
- throw new IllegalMonitorStateException("Releasing lock on resource "
- + resource + " without acquiring lock");
+ private ActiveLock getLockForReleasing(final R resource) {
+ if (activeLocks.containsKey(resource)) {
+ return activeLocks.get(resource);
}
- lock.unlock();
+ // Someone is releasing a lock which was never acquired.
+ LOG.error("Trying to release the lock on {}, which was never acquired.",
+ resource);
+ throw new IllegalMonitorStateException("Releasing lock on resource "
+ + resource + " without acquiring lock");
+ }
+
+ /**
+ * Decrements the active lock count and returns the {@link ActiveLock}
+ * object to pool if the active count is 0.
+ *
+ * @param resource resource to which the ActiveLock is associated
+ */
+ private void decrementActiveLockCount(final R resource) {
activeLocks.computeIfPresent(resource, (k, v) -> {
v.decrementActiveCount();
if (v.getActiveLockCount() != 0) {
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
index fa3030d..e88b1bb 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
@@ -29,34 +29,143 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class TestLockManager {
@Test(timeout = 1000)
- public void testWithDifferentResource() {
- LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
- manager.lock("/resourceOne");
+ public void testWriteLockWithDifferentResource() {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ manager.writeLock("/resourceOne");
// This should work, as they are different resource.
- manager.lock("/resourceTwo");
- manager.unlock("/resourceOne");
- manager.unlock("/resourceTwo");
+ manager.writeLock("/resourceTwo");
+ manager.writeUnlock("/resourceOne");
+ manager.writeUnlock("/resourceTwo");
Assert.assertTrue(true);
}
@Test
- public void testWithSameResource() throws Exception {
- LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
- manager.lock("/resourceOne");
- AtomicBoolean gotLock = new AtomicBoolean(false);
+ public void testWriteLockWithSameResource() throws Exception {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ final AtomicBoolean gotLock = new AtomicBoolean(false);
+ manager.writeLock("/resourceOne");
new Thread(() -> {
- manager.lock("/resourceOne");
+ manager.writeLock("/resourceOne");
gotLock.set(true);
- manager.unlock("/resourceOne");
+ manager.writeUnlock("/resourceOne");
}).start();
- // Let's give some time for the new thread to run
+ // Let's give some time for the other thread to run
Thread.sleep(100);
- // Since the new thread is trying to get lock on same object, it will wait.
+ // Since the other thread is trying to get write lock on same object,
+ // it will wait.
Assert.assertFalse(gotLock.get());
- manager.unlock("/resourceOne");
- // Since we have released the lock, the new thread should have the lock
- // now
- // Let's give some time for the new thread to run
+ manager.writeUnlock("/resourceOne");
+ // Since we have released the write lock, the other thread should have
+ // the lock now
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ Assert.assertTrue(gotLock.get());
+ }
+
+ @Test(timeout = 1000)
+ public void testReadLockWithDifferentResource() {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ manager.readLock("/resourceOne");
+ manager.readLock("/resourceTwo");
+ manager.readUnlock("/resourceOne");
+ manager.readUnlock("/resourceTwo");
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testReadLockWithSameResource() throws Exception {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ final AtomicBoolean gotLock = new AtomicBoolean(false);
+ manager.readLock("/resourceOne");
+ new Thread(() -> {
+ manager.readLock("/resourceOne");
+ gotLock.set(true);
+ manager.readUnlock("/resourceOne");
+ }).start();
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ // Since the new thread is trying to get read lock, it should work.
+ Assert.assertTrue(gotLock.get());
+ manager.readUnlock("/resourceOne");
+ }
+
+ @Test
+ public void testWriteReadLockWithSameResource() throws Exception {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ final AtomicBoolean gotLock = new AtomicBoolean(false);
+ manager.writeLock("/resourceOne");
+ new Thread(() -> {
+ manager.readLock("/resourceOne");
+ gotLock.set(true);
+ manager.readUnlock("/resourceOne");
+ }).start();
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ // Since the other thread is trying to get read lock on same object,
+ // it will wait.
+ Assert.assertFalse(gotLock.get());
+ manager.writeUnlock("/resourceOne");
+ // Since we have released the write lock, the other thread should have
+ // the lock now
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ Assert.assertTrue(gotLock.get());
+ }
+
+ @Test
+ public void testReadWriteLockWithSameResource() throws Exception {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ final AtomicBoolean gotLock = new AtomicBoolean(false);
+ manager.readLock("/resourceOne");
+ new Thread(() -> {
+ manager.writeLock("/resourceOne");
+ gotLock.set(true);
+ manager.writeUnlock("/resourceOne");
+ }).start();
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ // Since the other thread is trying to get write lock on same object,
+ // it will wait.
+ Assert.assertFalse(gotLock.get());
+ manager.readUnlock("/resourceOne");
+ // Since we have released the read lock, the other thread should have
+ // the lock now
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ Assert.assertTrue(gotLock.get());
+ }
+
+ @Test
+ public void testMultiReadWriteLockWithSameResource() throws Exception {
+ final LockManager<String> manager =
+ new LockManager<>(new OzoneConfiguration());
+ final AtomicBoolean gotLock = new AtomicBoolean(false);
+ manager.readLock("/resourceOne");
+ manager.readLock("/resourceOne");
+ new Thread(() -> {
+ manager.writeLock("/resourceOne");
+ gotLock.set(true);
+ manager.writeUnlock("/resourceOne");
+ }).start();
+ // Let's give some time for the other thread to run
+ Thread.sleep(100);
+ // Since the other thread is trying to get write lock on same object,
+ // it will wait.
+ Assert.assertFalse(gotLock.get());
+ manager.readUnlock("/resourceOne");
+ //We have only released one read lock, we still hold another read lock.
+ Thread.sleep(100);
+ Assert.assertFalse(gotLock.get());
+ manager.readUnlock("/resourceOne");
+ // Since we have released the read lock, the other thread should have
+ // the lock now
+ // Let's give some time for the other thread to run
Thread.sleep(100);
Assert.assertTrue(gotLock.get());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org