You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2015/05/06 08:52:04 UTC

hbase git commit: HBASE-13628 Use AtomicLong as size in BoundedConcurrentLinkedQueue

Repository: hbase
Updated Branches:
  refs/heads/master 664b2e4f1 -> 652929c0f


HBASE-13628 Use AtomicLong as size in BoundedConcurrentLinkedQueue


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/652929c0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/652929c0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/652929c0

Branch: refs/heads/master
Commit: 652929c0ff8c8cec1e86ded834f3e770422b2ace
Parents: 664b2e4
Author: zhangduo <zh...@wandoujia.com>
Authored: Wed May 6 14:48:08 2015 +0800
Committer: zhangduo <zh...@wandoujia.com>
Committed: Wed May 6 14:51:22 2015 +0800

----------------------------------------------------------------------
 .../util/BoundedConcurrentLinkedQueue.java      | 64 ++++++++------
 .../util/TestBoundedConcurrentLinkedQueue.java  | 89 ++++++++++++++++++--
 2 files changed, 118 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/652929c0/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
index 9208238..f66771b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceStability.Stable
 public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
   private static final long serialVersionUID = 1L;
-  private volatile long size = 0;
+  private final AtomicLong size = new AtomicLong(0L);
   private final long maxSize;
 
   public BoundedConcurrentLinkedQueue() {
@@ -43,40 +44,49 @@ public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
   }
 
   @Override
-  public boolean add(T e) {
-    return offer(e);
-  }
-
-  @Override
   public boolean addAll(Collection<? extends T> c) {
-    size += c.size();        // Between here and below we might reject offers,
-    if (size > maxSize) {    // if over maxSize, but that's ok
-      size -= c.size();      // We're over, just back out and return.
-      return false;
+    for (;;) {
+      long currentSize = size.get();
+      long nextSize = currentSize + c.size();
+      if (nextSize > maxSize) { // already exceeded limit
+        return false;
+      }
+      if (size.compareAndSet(currentSize, nextSize)) {
+        break;
+      }
     }
-    return super.addAll(c);  // Always true for ConcurrentLinkedQueue
+    return super.addAll(c); // Always true for ConcurrentLinkedQueue
   }
 
   @Override
   public void clear() {
-    super.clear();
-    size = 0;
+    // override this method to batch update size.
+    long removed = 0L;
+    while (super.poll() != null) {
+      removed++;
+    }
+    size.addAndGet(-removed);
   }
 
   @Override
   public boolean offer(T e) {
-    if (++size > maxSize) {
-      --size;                // We didn't take it after all
-      return false;
+    for (;;) {
+      long currentSize = size.get();
+      if (currentSize >= maxSize) { // already exceeded limit
+        return false;
+      }
+      if (size.compareAndSet(currentSize, currentSize + 1)) {
+        break;
+      }
     }
-    return super.offer(e);   // Always true for ConcurrentLinkedQueue
+    return super.offer(e); // Always true for ConcurrentLinkedQueue
   }
 
   @Override
   public T poll() {
     T result = super.poll();
     if (result != null) {
-      --size;
+      size.decrementAndGet();
     }
     return result;
   }
@@ -85,30 +95,28 @@ public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
   public boolean remove(Object o) {
     boolean result = super.remove(o);
     if (result) {
-      --size;
+      size.decrementAndGet();
     }
     return result;
   }
 
   @Override
   public int size() {
-    return (int) size;
+    return (int) size.get();
   }
 
   public void drainTo(Collection<T> list) {
     long removed = 0;
-    T l;
-    while ((l = super.poll()) != null) {
-      list.add(l);
+    for (T element; (element = super.poll()) != null;) {
+      list.add(element);
       removed++;
     }
-    // Limit the number of operations on a volatile by only reporting size
-    // change after the drain is completed.
-    size -= removed;
+    // Limit the number of operations on size by only reporting size change after the drain is
+    // completed.
+    size.addAndGet(-removed);
   }
 
   public long remainingCapacity() {
-    long remaining = maxSize - size;
-    return remaining >= 0 ? remaining : 0;
+    return maxSize - size.get();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/652929c0/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
index 3453f24..f6e6ac5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
@@ -18,20 +18,21 @@
 package org.apache.hadoop.hbase.util;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({MiscTests.class, SmallTests.class})
+@Category({ MiscTests.class, SmallTests.class })
 public class TestBoundedConcurrentLinkedQueue {
   private final static int CAPACITY = 16;
 
@@ -42,10 +43,6 @@ public class TestBoundedConcurrentLinkedQueue {
     this.queue = new BoundedConcurrentLinkedQueue<Long>(CAPACITY);
   }
 
-  @After
-  public void tearDown() throws Exception {
-  }
-
   @Test
   public void testOfferAndPoll() throws Exception {
     // Offer
@@ -83,4 +80,82 @@ public class TestBoundedConcurrentLinkedQueue {
     assertEquals(0, queue.size());
     assertEquals(CAPACITY, queue.remainingCapacity());
   }
+
+  @Test
+  public void testClear() {
+    // Offer
+    for (long i = 1; i <= CAPACITY; ++i) {
+      assertTrue(queue.offer(i));
+      assertEquals(i, queue.size());
+      assertEquals(CAPACITY - i, queue.remainingCapacity());
+    }
+    assertFalse(queue.offer(0L));
+
+    queue.clear();
+    assertEquals(null, queue.poll());
+    assertEquals(0, queue.size());
+    assertEquals(CAPACITY, queue.remainingCapacity());
+  }
+
+  @Test
+  public void testMultiThread() throws InterruptedException {
+    int offerThreadCount = 10;
+    int pollThreadCount = 5;
+    int duration = 5000; // ms
+    final AtomicBoolean stop = new AtomicBoolean(false);
+    Thread[] offerThreads = new Thread[offerThreadCount];
+    for (int i = 0; i < offerThreadCount; i++) {
+      offerThreads[i] = new Thread("offer-thread-" + i) {
+
+        @Override
+        public void run() {
+          Random rand = new Random();
+          while (!stop.get()) {
+            queue.offer(rand.nextLong());
+            try {
+              Thread.sleep(1);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+
+      };
+    }
+    Thread[] pollThreads = new Thread[pollThreadCount];
+    for (int i = 0; i < pollThreadCount; i++) {
+      pollThreads[i] = new Thread("poll-thread-" + i) {
+
+        @Override
+        public void run() {
+          while (!stop.get()) {
+            queue.poll();
+            try {
+              Thread.sleep(1);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+
+      };
+    }
+    for (Thread t : offerThreads) {
+      t.start();
+    }
+    for (Thread t : pollThreads) {
+      t.start();
+    }
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < duration) {
+      assertTrue(queue.size() <= CAPACITY);
+      Thread.yield();
+    }
+    stop.set(true);
+    for (Thread t : offerThreads) {
+      t.join();
+    }
+    for (Thread t : pollThreads) {
+      t.join();
+    }
+    assertTrue(queue.size() <= CAPACITY);
+  }
 }