You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2015/05/06 08:52:04 UTC
hbase git commit: HBASE-13628 Use AtomicLong as size in
BoundedConcurrentLinkedQueue
Repository: hbase
Updated Branches:
refs/heads/master 664b2e4f1 -> 652929c0f
HBASE-13628 Use AtomicLong as size in BoundedConcurrentLinkedQueue
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/652929c0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/652929c0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/652929c0
Branch: refs/heads/master
Commit: 652929c0ff8c8cec1e86ded834f3e770422b2ace
Parents: 664b2e4
Author: zhangduo <zh...@wandoujia.com>
Authored: Wed May 6 14:48:08 2015 +0800
Committer: zhangduo <zh...@wandoujia.com>
Committed: Wed May 6 14:51:22 2015 +0800
----------------------------------------------------------------------
.../util/BoundedConcurrentLinkedQueue.java | 64 ++++++++------
.../util/TestBoundedConcurrentLinkedQueue.java | 89 ++++++++++++++++++--
2 files changed, 118 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/652929c0/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
index 9208238..f66771b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedConcurrentLinkedQueue.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceStability.Stable
public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
private static final long serialVersionUID = 1L;
- private volatile long size = 0;
+ private final AtomicLong size = new AtomicLong(0L);
private final long maxSize;
public BoundedConcurrentLinkedQueue() {
@@ -43,40 +44,49 @@ public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
}
@Override
- public boolean add(T e) {
- return offer(e);
- }
-
- @Override
public boolean addAll(Collection<? extends T> c) {
- size += c.size(); // Between here and below we might reject offers,
- if (size > maxSize) { // if over maxSize, but that's ok
- size -= c.size(); // We're over, just back out and return.
- return false;
+ for (;;) {
+ long currentSize = size.get();
+ long nextSize = currentSize + c.size();
+ if (nextSize > maxSize) { // already exceeded limit
+ return false;
+ }
+ if (size.compareAndSet(currentSize, nextSize)) {
+ break;
+ }
}
- return super.addAll(c); // Always true for ConcurrentLinkedQueue
+ return super.addAll(c); // Always true for ConcurrentLinkedQueue
}
@Override
public void clear() {
- super.clear();
- size = 0;
+ // override this method to batch update size.
+ long removed = 0L;
+ while (super.poll() != null) {
+ removed++;
+ }
+ size.addAndGet(-removed);
}
@Override
public boolean offer(T e) {
- if (++size > maxSize) {
- --size; // We didn't take it after all
- return false;
+ for (;;) {
+ long currentSize = size.get();
+ if (currentSize >= maxSize) { // already exceeded limit
+ return false;
+ }
+ if (size.compareAndSet(currentSize, currentSize + 1)) {
+ break;
+ }
}
- return super.offer(e); // Always true for ConcurrentLinkedQueue
+ return super.offer(e); // Always true for ConcurrentLinkedQueue
}
@Override
public T poll() {
T result = super.poll();
if (result != null) {
- --size;
+ size.decrementAndGet();
}
return result;
}
@@ -85,30 +95,28 @@ public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
public boolean remove(Object o) {
boolean result = super.remove(o);
if (result) {
- --size;
+ size.decrementAndGet();
}
return result;
}
@Override
public int size() {
- return (int) size;
+ return (int) size.get();
}
public void drainTo(Collection<T> list) {
long removed = 0;
- T l;
- while ((l = super.poll()) != null) {
- list.add(l);
+ for (T element; (element = super.poll()) != null;) {
+ list.add(element);
removed++;
}
- // Limit the number of operations on a volatile by only reporting size
- // change after the drain is completed.
- size -= removed;
+ // Limit the number of operations on size by only reporting size change after the drain is
+ // completed.
+ size.addAndGet(-removed);
}
public long remainingCapacity() {
- long remaining = maxSize - size;
- return remaining >= 0 ? remaining : 0;
+ return maxSize - size.get();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/652929c0/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
index 3453f24..f6e6ac5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedConcurrentLinkedQueue.java
@@ -18,20 +18,21 @@
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category({MiscTests.class, SmallTests.class})
+@Category({ MiscTests.class, SmallTests.class })
public class TestBoundedConcurrentLinkedQueue {
private final static int CAPACITY = 16;
@@ -42,10 +43,6 @@ public class TestBoundedConcurrentLinkedQueue {
this.queue = new BoundedConcurrentLinkedQueue<Long>(CAPACITY);
}
- @After
- public void tearDown() throws Exception {
- }
-
@Test
public void testOfferAndPoll() throws Exception {
// Offer
@@ -83,4 +80,82 @@ public class TestBoundedConcurrentLinkedQueue {
assertEquals(0, queue.size());
assertEquals(CAPACITY, queue.remainingCapacity());
}
+
+ @Test
+ public void testClear() {
+ // Offer
+ for (long i = 1; i <= CAPACITY; ++i) {
+ assertTrue(queue.offer(i));
+ assertEquals(i, queue.size());
+ assertEquals(CAPACITY - i, queue.remainingCapacity());
+ }
+ assertFalse(queue.offer(0L));
+
+ queue.clear();
+ assertEquals(null, queue.poll());
+ assertEquals(0, queue.size());
+ assertEquals(CAPACITY, queue.remainingCapacity());
+ }
+
+ @Test
+ public void testMultiThread() throws InterruptedException {
+ int offerThreadCount = 10;
+ int pollThreadCount = 5;
+ int duration = 5000; // ms
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ Thread[] offerThreads = new Thread[offerThreadCount];
+ for (int i = 0; i < offerThreadCount; i++) {
+ offerThreads[i] = new Thread("offer-thread-" + i) {
+
+ @Override
+ public void run() {
+ Random rand = new Random();
+ while (!stop.get()) {
+ queue.offer(rand.nextLong());
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ };
+ }
+ Thread[] pollThreads = new Thread[pollThreadCount];
+ for (int i = 0; i < pollThreadCount; i++) {
+ pollThreads[i] = new Thread("poll-thread-" + i) {
+
+ @Override
+ public void run() {
+ while (!stop.get()) {
+ queue.poll();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ };
+ }
+ for (Thread t : offerThreads) {
+ t.start();
+ }
+ for (Thread t : pollThreads) {
+ t.start();
+ }
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < duration) {
+ assertTrue(queue.size() <= CAPACITY);
+ Thread.yield();
+ }
+ stop.set(true);
+ for (Thread t : offerThreads) {
+ t.join();
+ }
+ for (Thread t : pollThreads) {
+ t.join();
+ }
+ assertTrue(queue.size() <= CAPACITY);
+ }
}