You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/05/12 13:17:51 UTC
bookkeeper git commit: BOOKKEEPER-1066: Introduce
GrowableArrayBlockingQueue
Repository: bookkeeper
Updated Branches:
refs/heads/master eeaddeda6 -> e2894a9e9
BOOKKEEPER-1066: Introduce GrowableArrayBlockingQueue
In multiple places, (eg: journal, ordered executor, etc..), we are using `LinkedBlockingQueue` instances to pass objects between threads.
The `LinkedBlockingQueue` differs from the `ArrayBlockingQueue` in that it doesn't require to define a max queue size, though, being implemented with a linked list, it requires to allocates list nodes each time an item is added.
We can use a `GrowableArrayBlockingQueue` that behaves in the same way as the `LinkedBlockingQueue`, but it's implemented with an array that can be resized when the queue reaches the capacity.
Author: Matteo Merli <mm...@apache.org>
Reviewers: Enrico Olivelli <eo...@apache.org>
Closes #153 from merlimat/growable-blocking-queue
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e2894a9e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e2894a9e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e2894a9e
Branch: refs/heads/master
Commit: e2894a9e9946b4cc4647c280094bcb2eb1a521ea
Parents: eeadded
Author: Matteo Merli <mm...@apache.org>
Authored: Fri May 12 15:17:36 2017 +0200
Committer: eolivelli <eo...@apache.org>
Committed: Fri May 12 15:17:36 2017 +0200
----------------------------------------------------------------------
.../org/apache/bookkeeper/util/MathUtils.java | 4 +
.../collections/GrowableArrayBlockingQueue.java | 359 +++++++++++++++++++
.../GrowableArrayBlockingQueueTest.java | 206 +++++++++++
3 files changed, 569 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index 6aa9073..1b3044d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -35,6 +35,10 @@ public class MathUtils {
}
+ public static int findNextPositivePowerOfTwo(final int value) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+ }
+
/**
* Current time from some arbitrary time base in the past, counting in
* milliseconds, and not affected by settimeofday or similar system clock
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
new file mode 100644
index 0000000..8f7dae7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.util.MathUtils;
+
+
+/**
+ * This implements a {@link BlockingQueue} backed by an array with no fixed capacity.
+ *
+ * When the capacity is reached, data will be moved to a bigger array.
+ *
+ */
+public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements BlockingQueue<T> {
+
+ private final ReentrantLock headLock = new ReentrantLock();
+ private final PaddedInt headIndex = new PaddedInt();
+ private final PaddedInt tailIndex = new PaddedInt();
+ private final ReentrantLock tailLock = new ReentrantLock();
+ private final Condition isNotEmpty = headLock.newCondition();
+
+ private T[] data;
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<GrowableArrayBlockingQueue> SIZE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(GrowableArrayBlockingQueue.class, "size");
+ @SuppressWarnings("unused")
+ private volatile int size = 0;
+
+ public GrowableArrayBlockingQueue() {
+ this(64);
+ }
+
+ @SuppressWarnings("unchecked")
+ public GrowableArrayBlockingQueue(int initialCapacity) {
+ headIndex.value = 0;
+ tailIndex.value = 0;
+
+ int capacity = MathUtils.findNextPositivePowerOfTwo(initialCapacity);
+ data = (T[]) new Object[capacity];
+ }
+
+ @Override
+ public T remove() {
+ T item = poll();
+ if (item == null) {
+ throw new NoSuchElementException();
+ }
+
+ return item;
+ }
+
+ @Override
+ public T poll() {
+ headLock.lock();
+ try {
+ if (SIZE_UPDATER.get(this) > 0) {
+ T item = data[headIndex.value];
+ headIndex.value = (headIndex.value + 1) & (data.length - 1);
+ SIZE_UPDATER.decrementAndGet(this);
+ return item;
+ } else {
+ return null;
+ }
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public T element() {
+ T item = peek();
+ if (item == null) {
+ throw new NoSuchElementException();
+ }
+
+ return item;
+ }
+
+ @Override
+ public T peek() {
+ headLock.lock();
+ try {
+ if (SIZE_UPDATER.get(this) > 0) {
+ return data[headIndex.value];
+ } else {
+ return null;
+ }
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean offer(T e) {
+ // Queue is unbounded and it will never reject new items
+ put(e);
+ return true;
+ }
+
+ @Override
+ public void put(T e) {
+ tailLock.lock();
+
+ boolean wasEmpty = false;
+
+ try {
+ if (SIZE_UPDATER.get(this) == data.length) {
+ expandArray();
+ }
+
+ data[tailIndex.value] = e;
+ tailIndex.value = (tailIndex.value + 1) & (data.length - 1);
+ if (SIZE_UPDATER.getAndIncrement(this) == 0) {
+ wasEmpty = true;
+ }
+ } finally {
+ tailLock.unlock();
+ }
+
+ if (wasEmpty) {
+ headLock.lock();
+ try {
+ isNotEmpty.signal();
+ } finally {
+ headLock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public boolean add(T e) {
+ put(e);
+ return true;
+ }
+
+ @Override
+ public boolean offer(T e, long timeout, TimeUnit unit) {
+ // Queue is unbounded and it will never reject new items
+ put(e);
+ return true;
+ }
+
+ @Override
+ public T take() throws InterruptedException {
+ headLock.lockInterruptibly();
+
+ try {
+ while (SIZE_UPDATER.get(this) == 0) {
+ isNotEmpty.await();
+ }
+
+ T item = data[headIndex.value];
+ data[headIndex.value] = null;
+ headIndex.value = (headIndex.value + 1) & (data.length - 1);
+ if (SIZE_UPDATER.decrementAndGet(this) > 0) {
+ // There are still entries to consume
+ isNotEmpty.signal();
+ }
+ return item;
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ headLock.lockInterruptibly();
+
+ try {
+ long timeoutNanos = unit.toNanos(timeout);
+ while (SIZE_UPDATER.get(this) == 0) {
+ if (timeoutNanos <= 0) {
+ return null;
+ }
+
+ timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos);
+ }
+
+ T item = data[headIndex.value];
+ data[headIndex.value] = null;
+ headIndex.value = (headIndex.value + 1) & (data.length - 1);
+ if (SIZE_UPDATER.decrementAndGet(this) > 0) {
+ // There are still entries to consume
+ isNotEmpty.signal();
+ }
+ return item;
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c) {
+ return drainTo(c, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ headLock.lock();
+
+ try {
+ int drainedItems = 0;
+ int size = SIZE_UPDATER.get(this);
+
+ while (size > 0 && drainedItems < maxElements) {
+ T item = data[headIndex.value];
+ data[headIndex.value] = null;
+ c.add(item);
+
+ headIndex.value = (headIndex.value + 1) & (data.length - 1);
+ --size;
+ ++drainedItems;
+ }
+
+ if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) {
+ // There are still entries to consume
+ isNotEmpty.signal();
+ }
+
+ return drainedItems;
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public void clear() {
+ headLock.lock();
+
+ try {
+ int size = SIZE_UPDATER.get(this);
+
+ for (int i = 0; i < size; i++) {
+ data[headIndex.value] = null;
+ headIndex.value = (headIndex.value + 1) & (data.length - 1);
+ }
+
+ if (SIZE_UPDATER.addAndGet(this, -size) > 0) {
+ // There are still entries to consume
+ isNotEmpty.signal();
+ }
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ @Override
+ public int size() {
+ return SIZE_UPDATER.get(this);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ tailLock.lock();
+ headLock.lock();
+
+ try {
+ int headIndex = this.headIndex.value;
+ int size = SIZE_UPDATER.get(this);
+
+ sb.append('[');
+
+ for (int i = 0; i < size; i++) {
+ T item = data[headIndex];
+ if (i > 0) {
+ sb.append(", ");
+ }
+
+ sb.append(item);
+
+ headIndex = (headIndex + 1) & (data.length - 1);
+ }
+
+ sb.append(']');
+ } finally {
+ headLock.unlock();
+ tailLock.unlock();
+ }
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void expandArray() {
+ // We already hold the tailLock
+ headLock.lock();
+
+ try {
+ int size = SIZE_UPDATER.get(this);
+ int newCapacity = data.length * 2;
+ T[] newData = (T[]) new Object[newCapacity];
+
+ int oldHeadIndex = headIndex.value;
+ int newTailIndex = 0;
+
+ for (int i = 0; i < size; i++) {
+ newData[newTailIndex++] = data[oldHeadIndex];
+ oldHeadIndex = (oldHeadIndex + 1) & (data.length - 1);
+ }
+
+ data = newData;
+ headIndex.value = 0;
+ tailIndex.value = size;
+ } finally {
+ headLock.unlock();
+ }
+ }
+
+ final static class PaddedInt {
+ private int value;
+
+ // Padding to avoid false sharing
+ public volatile int pi1 = 1;
+ public volatile long p1 = 1L, p2 = 2L, p3 = 3L, p4 = 4L, p5 = 5L, p6 = 6L;
+
+ public long exposeToAvoidOptimization() {
+ return pi1 + p1 + p2 + p3 + p4 + p5 + p6;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
new file mode 100644
index 0000000..9fd7e84
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class GrowableArrayBlockingQueueTest {
+
+ @Test
+ public void simple() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll());
+
+ assertEquals(Integer.MAX_VALUE, queue.remainingCapacity());
+ assertEquals("[]", queue.toString());
+
+ try {
+ queue.element();
+ fail("Should have thrown exception");
+ } catch (NoSuchElementException e) {
+ // Expected
+ }
+
+ try {
+ queue.iterator();
+ fail("Should have thrown exception");
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+
+ // Test index rollover
+ for (int i = 0; i < 100; i++) {
+ queue.add(i);
+
+ assertEquals(i, queue.take().intValue());
+ }
+
+ queue.offer(1);
+ assertEquals("[1]", queue.toString());
+ queue.offer(2);
+ assertEquals("[1, 2]", queue.toString());
+ queue.offer(3);
+ assertEquals("[1, 2, 3]", queue.toString());
+ queue.offer(4);
+ assertEquals("[1, 2, 3, 4]", queue.toString());
+
+ assertEquals(4, queue.size());
+
+ List<Integer> list = new ArrayList<>();
+ queue.drainTo(list, 3);
+
+ assertEquals(1, queue.size());
+ assertEquals(Lists.newArrayList(1, 2, 3), list);
+ assertEquals("[4]", queue.toString());
+ assertEquals(4, queue.peek().intValue());
+
+ assertEquals(4, queue.element().intValue());
+ assertEquals(4, queue.remove().intValue());
+ try {
+ queue.remove();
+ fail("Should have thrown exception");
+ } catch (NoSuchElementException e) {
+ // Expected
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void blockingTake() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ int expected = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int n = queue.take();
+
+ assertEquals(expected++, n);
+ }
+
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ int n = 0;
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ queue.put(n);
+ ++n;
+ }
+
+ // Wait until all the entries are consumed
+ while (!queue.isEmpty()) {
+ Thread.sleep(1);
+ }
+ }
+
+ latch.await();
+ }
+
+ @Test
+ public void growArray() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll());
+
+ assertTrue(queue.offer(1));
+ assertTrue(queue.offer(2));
+ assertTrue(queue.offer(3));
+ assertTrue(queue.offer(4));
+ assertTrue(queue.offer(5));
+
+ assertEquals(5, queue.size());
+
+ queue.clear();
+ assertEquals(0, queue.size());
+
+ assertTrue(queue.offer(1, 1, TimeUnit.SECONDS));
+ assertTrue(queue.offer(2, 1, TimeUnit.SECONDS));
+ assertTrue(queue.offer(3, 1, TimeUnit.SECONDS));
+ assertEquals(3, queue.size());
+
+ List<Integer> list = new ArrayList<>();
+ queue.drainTo(list);
+ assertEquals(0, queue.size());
+
+ assertEquals(Lists.newArrayList(1, 2, 3), list);
+ }
+
+ @Test(timeout = 10000)
+ public void pollTimeout() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll(1, TimeUnit.MILLISECONDS));
+
+ queue.put(1);
+ assertEquals(1, queue.poll(1, TimeUnit.MILLISECONDS).intValue());
+
+ // 0 timeout should not block
+ assertEquals(null, queue.poll(0, TimeUnit.HOURS));
+
+ queue.put(2);
+ queue.put(3);
+ assertEquals(2, queue.poll(1, TimeUnit.HOURS).intValue());
+ assertEquals(3, queue.poll(1, TimeUnit.HOURS).intValue());
+ }
+
+ @Test(timeout = 10000)
+ public void pollTimeout2() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ queue.poll(1, TimeUnit.HOURS);
+
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ // Make sure background thread is waiting on poll
+ Thread.sleep(100);
+ queue.put(1);
+
+ latch.await();
+ }
+}