You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/05/12 13:17:51 UTC

bookkeeper git commit: BOOKKEEPER-1066: Introduce GrowableArrayBlockingQueue

Repository: bookkeeper
Updated Branches:
  refs/heads/master eeaddeda6 -> e2894a9e9


BOOKKEEPER-1066: Introduce GrowableArrayBlockingQueue

In multiple places, (eg: journal, ordered executor, etc..), we are using `LinkedBlockingQueue` instances to pass objects between threads.

The `LinkedBlockingQueue` differs from the `ArrayBlockingQueue` in that it doesn't require to define a max queue size, though, being implemented with a linked list, it requires to allocates list nodes each time an item is added.

We can use a `GrowableArrayBlockingQueue` that behaves in the same way as the `LinkedBlockingQueue`, but it's implemented with an array that can be resized when the queue reaches the capacity.

Author: Matteo Merli <mm...@apache.org>

Reviewers: Enrico Olivelli <eo...@apache.org>

Closes #153 from merlimat/growable-blocking-queue


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e2894a9e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e2894a9e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e2894a9e

Branch: refs/heads/master
Commit: e2894a9e9946b4cc4647c280094bcb2eb1a521ea
Parents: eeadded
Author: Matteo Merli <mm...@apache.org>
Authored: Fri May 12 15:17:36 2017 +0200
Committer: eolivelli <eo...@apache.org>
Committed: Fri May 12 15:17:36 2017 +0200

----------------------------------------------------------------------
 .../org/apache/bookkeeper/util/MathUtils.java   |   4 +
 .../collections/GrowableArrayBlockingQueue.java | 359 +++++++++++++++++++
 .../GrowableArrayBlockingQueueTest.java         | 206 +++++++++++
 3 files changed, 569 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
index 6aa9073..1b3044d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/MathUtils.java
@@ -35,6 +35,10 @@ public class MathUtils {
 
     }
 
+    public static int findNextPositivePowerOfTwo(final int value) {
+        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+    }
+
     /**
      * Current time from some arbitrary time base in the past, counting in
      * milliseconds, and not affected by settimeofday or similar system clock

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
new file mode 100644
index 0000000..8f7dae7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueue.java
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.util.MathUtils;
+
+
+/**
+ * This implements a {@link BlockingQueue} backed by an array with no fixed capacity.
+ *
+ * When the capacity is reached, data will be moved to a bigger array.
+ *
+ */
+public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements BlockingQueue<T> {
+
+    private final ReentrantLock headLock = new ReentrantLock();
+    private final PaddedInt headIndex = new PaddedInt();
+    private final PaddedInt tailIndex = new PaddedInt();
+    private final ReentrantLock tailLock = new ReentrantLock();
+    private final Condition isNotEmpty = headLock.newCondition();
+
+    private T[] data;
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<GrowableArrayBlockingQueue> SIZE_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(GrowableArrayBlockingQueue.class, "size");
+    @SuppressWarnings("unused")
+    private volatile int size = 0;
+
+    public GrowableArrayBlockingQueue() {
+        this(64);
+    }
+
+    @SuppressWarnings("unchecked")
+    public GrowableArrayBlockingQueue(int initialCapacity) {
+        headIndex.value = 0;
+        tailIndex.value = 0;
+
+        int capacity = MathUtils.findNextPositivePowerOfTwo(initialCapacity);
+        data = (T[]) new Object[capacity];
+    }
+
+    @Override
+    public T remove() {
+        T item = poll();
+        if (item == null) {
+            throw new NoSuchElementException();
+        }
+
+        return item;
+    }
+
+    @Override
+    public T poll() {
+        headLock.lock();
+        try {
+            if (SIZE_UPDATER.get(this) > 0) {
+                T item = data[headIndex.value];
+                headIndex.value = (headIndex.value + 1) & (data.length - 1);
+                SIZE_UPDATER.decrementAndGet(this);
+                return item;
+            } else {
+                return null;
+            }
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public T element() {
+        T item = peek();
+        if (item == null) {
+            throw new NoSuchElementException();
+        }
+
+        return item;
+    }
+
+    @Override
+    public T peek() {
+        headLock.lock();
+        try {
+            if (SIZE_UPDATER.get(this) > 0) {
+                return data[headIndex.value];
+            } else {
+                return null;
+            }
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean offer(T e) {
+        // Queue is unbounded and it will never reject new items
+        put(e);
+        return true;
+    }
+
+    @Override
+    public void put(T e) {
+        tailLock.lock();
+
+        boolean wasEmpty = false;
+
+        try {
+            if (SIZE_UPDATER.get(this) == data.length) {
+                expandArray();
+            }
+
+            data[tailIndex.value] = e;
+            tailIndex.value = (tailIndex.value + 1) & (data.length - 1);
+            if (SIZE_UPDATER.getAndIncrement(this) == 0) {
+                wasEmpty = true;
+            }
+        } finally {
+            tailLock.unlock();
+        }
+
+        if (wasEmpty) {
+            headLock.lock();
+            try {
+                isNotEmpty.signal();
+            } finally {
+                headLock.unlock();
+            }
+        }
+    }
+
+    @Override
+    public boolean add(T e) {
+        put(e);
+        return true;
+    }
+
+    @Override
+    public boolean offer(T e, long timeout, TimeUnit unit) {
+        // Queue is unbounded and it will never reject new items
+        put(e);
+        return true;
+    }
+
+    @Override
+    public T take() throws InterruptedException {
+        headLock.lockInterruptibly();
+
+        try {
+            while (SIZE_UPDATER.get(this) == 0) {
+                isNotEmpty.await();
+            }
+
+            T item = data[headIndex.value];
+            data[headIndex.value] = null;
+            headIndex.value = (headIndex.value + 1) & (data.length - 1);
+            if (SIZE_UPDATER.decrementAndGet(this) > 0) {
+                // There are still entries to consume
+                isNotEmpty.signal();
+            }
+            return item;
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        headLock.lockInterruptibly();
+
+        try {
+            long timeoutNanos = unit.toNanos(timeout);
+            while (SIZE_UPDATER.get(this) == 0) {
+                if (timeoutNanos <= 0) {
+                    return null;
+                }
+
+                timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos);
+            }
+
+            T item = data[headIndex.value];
+            data[headIndex.value] = null;
+            headIndex.value = (headIndex.value + 1) & (data.length - 1);
+            if (SIZE_UPDATER.decrementAndGet(this) > 0) {
+                // There are still entries to consume
+                isNotEmpty.signal();
+            }
+            return item;
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c) {
+        return drainTo(c, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        headLock.lock();
+
+        try {
+            int drainedItems = 0;
+            int size = SIZE_UPDATER.get(this);
+
+            while (size > 0 && drainedItems < maxElements) {
+                T item = data[headIndex.value];
+                data[headIndex.value] = null;
+                c.add(item);
+
+                headIndex.value = (headIndex.value + 1) & (data.length - 1);
+                --size;
+                ++drainedItems;
+            }
+
+            if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) {
+                // There are still entries to consume
+                isNotEmpty.signal();
+            }
+
+            return drainedItems;
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        headLock.lock();
+
+        try {
+            int size = SIZE_UPDATER.get(this);
+
+            for (int i = 0; i < size; i++) {
+                data[headIndex.value] = null;
+                headIndex.value = (headIndex.value + 1) & (data.length - 1);
+            }
+
+            if (SIZE_UPDATER.addAndGet(this, -size) > 0) {
+                // There are still entries to consume
+                isNotEmpty.signal();
+            }
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    @Override
+    public int size() {
+        return SIZE_UPDATER.get(this);
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+
+        tailLock.lock();
+        headLock.lock();
+
+        try {
+            int headIndex = this.headIndex.value;
+            int size = SIZE_UPDATER.get(this);
+
+            sb.append('[');
+
+            for (int i = 0; i < size; i++) {
+                T item = data[headIndex];
+                if (i > 0) {
+                    sb.append(", ");
+                }
+
+                sb.append(item);
+
+                headIndex = (headIndex + 1) & (data.length - 1);
+            }
+
+            sb.append(']');
+        } finally {
+            headLock.unlock();
+            tailLock.unlock();
+        }
+        return sb.toString();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void expandArray() {
+        // We already hold the tailLock
+        headLock.lock();
+
+        try {
+            int size = SIZE_UPDATER.get(this);
+            int newCapacity = data.length * 2;
+            T[] newData = (T[]) new Object[newCapacity];
+
+            int oldHeadIndex = headIndex.value;
+            int newTailIndex = 0;
+
+            for (int i = 0; i < size; i++) {
+                newData[newTailIndex++] = data[oldHeadIndex];
+                oldHeadIndex = (oldHeadIndex + 1) & (data.length - 1);
+            }
+
+            data = newData;
+            headIndex.value = 0;
+            tailIndex.value = size;
+        } finally {
+            headLock.unlock();
+        }
+    }
+
+    final static class PaddedInt {
+        private int value;
+
+        // Padding to avoid false sharing
+        public volatile int pi1 = 1;
+        public volatile long p1 = 1L, p2 = 2L, p3 = 3L, p4 = 4L, p5 = 5L, p6 = 6L;
+
+        public long exposeToAvoidOptimization() {
+            return pi1 + p1 + p2 + p3 + p4 + p5 + p6;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e2894a9e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
new file mode 100644
index 0000000..9fd7e84
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/GrowableArrayBlockingQueueTest.java
@@ -0,0 +1,206 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class GrowableArrayBlockingQueueTest {
+
+    @Test
+    public void simple() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll());
+
+        assertEquals(Integer.MAX_VALUE, queue.remainingCapacity());
+        assertEquals("[]", queue.toString());
+
+        try {
+            queue.element();
+            fail("Should have thrown exception");
+        } catch (NoSuchElementException e) {
+            // Expected
+        }
+
+        try {
+            queue.iterator();
+            fail("Should have thrown exception");
+        } catch (UnsupportedOperationException e) {
+            // Expected
+        }
+
+        // Test index rollover
+        for (int i = 0; i < 100; i++) {
+            queue.add(i);
+
+            assertEquals(i, queue.take().intValue());
+        }
+
+        queue.offer(1);
+        assertEquals("[1]", queue.toString());
+        queue.offer(2);
+        assertEquals("[1, 2]", queue.toString());
+        queue.offer(3);
+        assertEquals("[1, 2, 3]", queue.toString());
+        queue.offer(4);
+        assertEquals("[1, 2, 3, 4]", queue.toString());
+
+        assertEquals(4, queue.size());
+
+        List<Integer> list = new ArrayList<>();
+        queue.drainTo(list, 3);
+
+        assertEquals(1, queue.size());
+        assertEquals(Lists.newArrayList(1, 2, 3), list);
+        assertEquals("[4]", queue.toString());
+        assertEquals(4, queue.peek().intValue());
+
+        assertEquals(4, queue.element().intValue());
+        assertEquals(4, queue.remove().intValue());
+        try {
+            queue.remove();
+            fail("Should have thrown exception");
+        } catch (NoSuchElementException e) {
+            // Expected
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void blockingTake() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                int expected = 0;
+
+                for (int i = 0; i < 100; i++) {
+                    int n = queue.take();
+
+                    assertEquals(expected++, n);
+                }
+
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        int n = 0;
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 10; j++) {
+                queue.put(n);
+                ++n;
+            }
+
+            // Wait until all the entries are consumed
+            while (!queue.isEmpty()) {
+                Thread.sleep(1);
+            }
+        }
+
+        latch.await();
+    }
+
+    @Test
+    public void growArray() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll());
+
+        assertTrue(queue.offer(1));
+        assertTrue(queue.offer(2));
+        assertTrue(queue.offer(3));
+        assertTrue(queue.offer(4));
+        assertTrue(queue.offer(5));
+
+        assertEquals(5, queue.size());
+
+        queue.clear();
+        assertEquals(0, queue.size());
+
+        assertTrue(queue.offer(1, 1, TimeUnit.SECONDS));
+        assertTrue(queue.offer(2, 1, TimeUnit.SECONDS));
+        assertTrue(queue.offer(3, 1, TimeUnit.SECONDS));
+        assertEquals(3, queue.size());
+
+        List<Integer> list = new ArrayList<>();
+        queue.drainTo(list);
+        assertEquals(0, queue.size());
+
+        assertEquals(Lists.newArrayList(1, 2, 3), list);
+    }
+
+    @Test(timeout = 10000)
+    public void pollTimeout() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll(1, TimeUnit.MILLISECONDS));
+
+        queue.put(1);
+        assertEquals(1, queue.poll(1, TimeUnit.MILLISECONDS).intValue());
+
+        // 0 timeout should not block
+        assertEquals(null, queue.poll(0, TimeUnit.HOURS));
+
+        queue.put(2);
+        queue.put(3);
+        assertEquals(2, queue.poll(1, TimeUnit.HOURS).intValue());
+        assertEquals(3, queue.poll(1, TimeUnit.HOURS).intValue());
+    }
+
+    @Test(timeout = 10000)
+    public void pollTimeout2() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                queue.poll(1, TimeUnit.HOURS);
+
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        // Make sure background thread is waiting on poll
+        Thread.sleep(100);
+        queue.put(1);
+
+        latch.await();
+    }
+}