You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/10/13 08:14:03 UTC

hbase git commit: HBASE-14268 Improve KeyLocker (Hiroshi Ikeda)

Repository: hbase
Updated Branches:
  refs/heads/master a45cb72ef -> 99e99f3b5


HBASE-14268 Improve KeyLocker (Hiroshi Ikeda)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/99e99f3b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/99e99f3b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/99e99f3b

Branch: refs/heads/master
Commit: 99e99f3b54bb8801565fbe2a2c071da44281868d
Parents: a45cb72
Author: stack <st...@apache.org>
Authored: Mon Oct 12 23:13:56 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Oct 12 23:13:56 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/KeyLocker.java | 125 +++++++------------
 .../apache/hadoop/hbase/util/TestKeyLocker.java |  16 ++-
 2 files changed, 58 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/99e99f3b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
index 5398582..dec91aa 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
@@ -18,49 +18,45 @@
 
 package org.apache.hadoop.hbase.util;
 
-
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * A utility class to manage a set of locks. Each lock is identified by a String which serves
- * as a key. Typical usage is: <p>
- * class Example{
- * private final static KeyLocker&lt;String&gt; locker = new Locker&lt;String&gt;();
- * </p>
- * <p>
- * public void foo(String s){
- * Lock lock = locker.acquireLock(s);
- * try {
- * // whatever
- * }finally{
- * lock.unlock();
- * }
- * }
+ * as a key. Typical usage is: <pre>
+ * class Example {
+ *   private final static KeyLocker&lt;String&gt; locker = new Locker&lt;String&gt;();
+ *   public void foo(String s){
+ *     Lock lock = locker.acquireLock(s);
+ *     try {
+ *       // whatever
+ *     }finally{
+ *       lock.unlock();
+ *     }
+ *   }
  * }
- * </p>
+ * </pre>
  */
 @InterfaceAudience.Private
-public class KeyLocker<K extends Comparable<? super K>> {
-  private static final Log LOG = LogFactory.getLog(KeyLocker.class);
-
+public class KeyLocker<K> {
   // The number of lock we want to easily support. It's not a maximum.
   private static final int NB_CONCURRENT_LOCKS = 1000;
 
-  // We need an atomic counter to manage the number of users using the lock and free it when
-  //  it's equal to zero.
-  private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
-    new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
+  private final WeakObjectPool<K, ReentrantLock> lockPool =
+      new WeakObjectPool<K, ReentrantLock>(
+          new WeakObjectPool.ObjectFactory<K, ReentrantLock>() {
+            @Override
+            public ReentrantLock createObject(K key) {
+              return new ReentrantLock();
+            }
+          },
+          NB_CONCURRENT_LOCKS);
 
   /**
    * Return a lock for the given key. The lock is already locked.
@@ -70,67 +66,36 @@ public class KeyLocker<K extends Comparable<? super K>> {
   public ReentrantLock acquireLock(K key) {
     if (key == null) throw new IllegalArgumentException("key must not be null");
 
-    Pair<KeyLock<K>, AtomicInteger> lock;
-    synchronized (this) {
-      lock = locks.get(key);
-      if (lock == null) {
-        lock = new Pair<KeyLock<K>, AtomicInteger>(
-          new KeyLock<K>(this, key), new AtomicInteger(1));
-        locks.put(key, lock);
-      } else {
-        lock.getSecond().incrementAndGet();
-      }
-    }
-    lock.getFirst().lock();
-    return lock.getFirst();
+    lockPool.purge();
+    ReentrantLock lock = lockPool.get(key);
+
+    lock.lock();
+    return lock;
   }
 
   /**
    * Acquire locks for a set of keys. The keys will be
    * sorted internally to avoid possible deadlock.
+   *
+   * @throw ClassCastException if the given {@code keys}
+   *    contains elements that are not mutually comparable
    */
-  public Map<K, Lock> acquireLocks(final Set<K> keys) {
-    Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
-    SortedSet<K> sortedKeys = new TreeSet<K>(keys);
-    for (K key : sortedKeys) {
-      locks.put(key, acquireLock(key));
-    }
-    return locks;
-  }
-
-  /**
-   * Free the lock for the given key.
-   */
-  private synchronized void releaseLock(K key) {
-    Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
-    if (lock != null) {
-      if (lock.getSecond().decrementAndGet() == 0) {
-        locks.remove(key);
-      }
-    } else {
-      String message = "Can't release the lock for " + key+", this key is not in the key list." +
-        " known keys are: "+ locks.keySet();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
-  }
-
-  static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
-    private static final long serialVersionUID = -12432857283423584L;
+  public Map<K, Lock> acquireLocks(Set<? extends K> keys) {
+    Object[] keyArray = keys.toArray();
+    Arrays.sort(keyArray);
 
-    private final transient KeyLocker<K> locker;
-    private final K lockId;
-
-    private KeyLock(KeyLocker<K> locker, K lockId) {
-      super();
-      this.locker = locker;
-      this.lockId = lockId;
+    lockPool.purge();
+    Map<K, Lock> locks = new LinkedHashMap<K, Lock>(keyArray.length);
+    for (Object o : keyArray) {
+      @SuppressWarnings("unchecked")
+      K key = (K)o;
+      ReentrantLock lock = lockPool.get(key);
+      locks.put(key, lock);
     }
 
-    @Override
-    public void unlock() {
-      super.unlock();
-      locker.releaseLock(lockId);
+    for (Lock lock : locks.values()) {
+      lock.lock();
     }
+    return locks;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/99e99f3b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
index 9bb8a04..40b918c 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
@@ -30,7 +30,7 @@ import org.junit.experimental.categories.Category;
 public class TestKeyLocker {
   @Test
   public void testLocker(){
-    KeyLocker<String> locker = new KeyLocker();
+    KeyLocker<String> locker = new KeyLocker<String>();
     ReentrantLock lock1 = locker.acquireLock("l1");
     Assert.assertTrue(lock1.isHeldByCurrentThread());
 
@@ -51,9 +51,19 @@ public class TestKeyLocker {
     lock2.unlock();
     Assert.assertFalse(lock20.isHeldByCurrentThread());
 
-    // The lock object was freed once useless, so we're recreating a new one
+    // The lock object will be garbage-collected
+    // if you free its reference for a long time,
+    // and you will get a new one at the next time.
+    int lock2Hash = System.identityHashCode(lock2);
+    lock2 = null;
+    lock20 = null;
+
+    System.gc();
+    System.gc();
+    System.gc();
+
     ReentrantLock lock200 = locker.acquireLock("l2");
-    Assert.assertTrue(lock2 != lock200);
+    Assert.assertNotEquals(lock2Hash, System.identityHashCode(lock200));
     lock200.unlock();
     Assert.assertFalse(lock200.isHeldByCurrentThread());