You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/06 14:35:20 UTC

[bookkeeper] branch master updated: Added BatchedArrayBlockingQueue (#3838)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 73c5a0e011 Added BatchedArrayBlockingQueue (#3838)
73c5a0e011 is described below

commit 73c5a0e0111754690e5cd74fb030ee64a2f829d9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 6 06:35:11 2023 -0800

    Added BatchedArrayBlockingQueue (#3838)
---
 .../collections/BatchedArrayBlockingQueue.java     | 409 +++++++++++++++++++++
 .../common/collections/BatchedBlockingQueue.java   |  55 +++
 .../common/collections/BlockingMpscQueue.java      |  41 ++-
 .../collections/BatchedArrayBlockingQueueTest.java | 312 ++++++++++++++++
 .../bookkeeper/common/MpScQueueBenchmark.java      | 134 +++++++
 5 files changed, 950 insertions(+), 1 deletion(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java
new file mode 100644
index 0000000000..646391b49e
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.AbstractQueue;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This implements a {@link BlockingQueue} backed by an array with fixed capacity.
+ *
+ * <p>This queue only allows 1 consumer thread to dequeue items and multiple producer threads.
+ */
+public class BatchedArrayBlockingQueue<T>
+        extends AbstractQueue<T>
+        implements BlockingQueue<T>, BatchedBlockingQueue<T> {
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final Condition notEmpty = lock.newCondition();
+    private final Condition notFull = lock.newCondition();
+
+    private final int capacity;
+    private final T[] data;
+
+    private int size;
+
+    private int consumerIdx;
+    private int producerIdx;
+
+    @SuppressWarnings("unchecked")
+    public BatchedArrayBlockingQueue(int capacity) {
+        this.capacity = capacity;
+        this.data = (T[]) new Object[this.capacity];
+    }
+
+    private T dequeueOne() {
+        T item = data[consumerIdx];
+        data[consumerIdx] = null;
+        if (++consumerIdx == capacity) {
+            consumerIdx = 0;
+        }
+
+        if (size-- == capacity) {
+            notFull.signalAll();
+        }
+
+        return item;
+    }
+
+    private void enqueueOne(T item) {
+        data[producerIdx] = item;
+        if (++producerIdx == capacity) {
+            producerIdx = 0;
+        }
+
+        if (size++ == 0) {
+            notEmpty.signalAll();
+        }
+    }
+
+    @Override
+    public T poll() {
+        lock.lock();
+
+        try {
+            if (size == 0) {
+                return null;
+            }
+
+            return dequeueOne();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public T peek() {
+        lock.lock();
+
+        try {
+            if (size == 0) {
+                return null;
+            }
+
+            return data[consumerIdx];
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean offer(T e) {
+        lock.lock();
+
+        try {
+            if (size == capacity) {
+                return false;
+            }
+
+            enqueueOne(e);
+
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void put(T e) throws InterruptedException {
+        lock.lockInterruptibly();
+
+        try {
+            while (size == capacity) {
+                notFull.await();
+            }
+
+            enqueueOne(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int putAll(List<T> c) throws InterruptedException {
+        lock.lockInterruptibly();
+
+        try {
+            while (size == capacity) {
+                notFull.await();
+            }
+
+            int availableCapacity = capacity - size;
+
+            int toInsert = Math.min(availableCapacity, c.size());
+
+            int producerIdx = this.producerIdx;
+            for (int i = 0; i < toInsert; i++) {
+                data[producerIdx] = c.get(i);
+                if (++producerIdx == capacity) {
+                    producerIdx = 0;
+                }
+            }
+
+            this.producerIdx = producerIdx;
+
+            if (size == 0) {
+                notEmpty.signalAll();
+            }
+
+            size += toInsert;
+
+            return toInsert;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void putAll(T[] a, int offset, int len) throws InterruptedException {
+        while (len > 0) {
+            int published = internalPutAll(a, offset, len);
+            offset += published;
+            len -= published;
+        }
+    }
+
+    private int internalPutAll(T[] a, int offset, int len) throws InterruptedException {
+        lock.lockInterruptibly();
+
+        try {
+            while (size == capacity) {
+                notFull.await();
+            }
+
+            int availableCapacity = capacity - size;
+            int toInsert = Math.min(availableCapacity, len);
+            int producerIdx = this.producerIdx;
+
+            // First span
+            int firstSpan = Math.min(toInsert, capacity - producerIdx);
+            System.arraycopy(a, offset, data, producerIdx, firstSpan);
+            producerIdx += firstSpan;
+
+            int secondSpan = toInsert - firstSpan;
+            if (secondSpan > 0) {
+                System.arraycopy(a, offset + firstSpan, data, 0, secondSpan);
+                producerIdx = secondSpan;
+            }
+
+            if (producerIdx == capacity) {
+                producerIdx = 0;
+            }
+
+            this.producerIdx = producerIdx;
+
+            if (size == 0) {
+                notEmpty.signalAll();
+            }
+
+            size += toInsert;
+            return toInsert;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        long remainingTimeNanos = unit.toNanos(timeout);
+
+        lock.lockInterruptibly();
+        try {
+            while (size == capacity) {
+                if (remainingTimeNanos <= 0L) {
+                    return false;
+                }
+
+                remainingTimeNanos = notFull.awaitNanos(remainingTimeNanos);
+            }
+
+            enqueueOne(e);
+            return true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public T take() throws InterruptedException {
+        lock.lockInterruptibly();
+
+        try {
+            while (size == 0) {
+                notEmpty.await();
+            }
+
+            return dequeueOne();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long remainingTimeNanos = unit.toNanos(timeout);
+
+        lock.lockInterruptibly();
+        try {
+            while (size == 0) {
+                if (remainingTimeNanos <= 0L) {
+                    return null;
+                }
+
+                remainingTimeNanos = notEmpty.awaitNanos(remainingTimeNanos);
+            }
+
+            return dequeueOne();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return capacity - size;
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c) {
+        return drainTo(c, capacity);
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        lock.lock();
+        try {
+            int toDrain = Math.min(size, maxElements);
+
+            int consumerIdx = this.consumerIdx;
+            for (int i = 0; i < toDrain; i++) {
+                T item = data[consumerIdx];
+                data[consumerIdx] = null;
+                c.add(item);
+
+                if (++consumerIdx == capacity) {
+                    consumerIdx = 0;
+                }
+            }
+
+            this.consumerIdx = consumerIdx;
+            if (size == capacity) {
+                notFull.signalAll();
+            }
+
+            size -= toDrain;
+            return toDrain;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int takeAll(T[] array) throws InterruptedException {
+        return internalTakeAll(array, true, 0, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException {
+        return internalTakeAll(array, false, timeout, unit);
+    }
+
+    private int internalTakeAll(T[] array, boolean waitForever, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        lock.lockInterruptibly();
+        try {
+            while (size == 0) {
+                if (waitForever) {
+                    notEmpty.await();
+                } else {
+                    if (!notEmpty.await(timeout, unit)) {
+                        return 0;
+                    }
+                }
+            }
+
+            int toDrain = Math.min(size, array.length);
+
+            int consumerIdx = this.consumerIdx;
+
+            // First span
+            int firstSpan = Math.min(toDrain, capacity - consumerIdx);
+            System.arraycopy(data, consumerIdx, array, 0, firstSpan);
+            Arrays.fill(data, consumerIdx, consumerIdx + firstSpan, null);
+            consumerIdx += firstSpan;
+
+            int secondSpan = toDrain - firstSpan;
+            if (secondSpan > 0) {
+                System.arraycopy(data, 0, array, firstSpan, secondSpan);
+                Arrays.fill(data, 0, secondSpan, null);
+                consumerIdx = secondSpan;
+            }
+
+            if (consumerIdx == capacity) {
+                consumerIdx = 0;
+            }
+            this.consumerIdx = consumerIdx;
+            if (size == capacity) {
+                notFull.signalAll();
+            }
+
+            size -= toDrain;
+            return toDrain;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        lock.lock();
+        try {
+            while (size > 0) {
+                dequeueOne();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int size() {
+        lock.lock();
+
+        try {
+            return size;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java
new file mode 100644
index 0000000000..5a0e0a72ea
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public interface BatchedBlockingQueue<T> extends BlockingQueue<T> {
+    void putAll(T[] a, int offset, int len) throws InterruptedException;
+
+    /**
+     * Drain the queue into an array.
+     * Wait if there are no items in the queue.
+     *
+     * @param array
+     * @return
+     * @throws InterruptedException
+     */
+    int takeAll(T[] array) throws InterruptedException;
+
+    /**
+     * Removes multiple items from the queue.
+     *
+     * The method returns when either:
+     *  1. At least one item is available
+     *  2. The timeout expires
+     *
+     *
+     * @param array
+     * @param timeout
+     * @param unit
+     * @return
+     * @throws InterruptedException
+     */
+    int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException;
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
index b87c8f638e..56d9627e84 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
@@ -26,7 +26,7 @@ import org.jctools.queues.MpscArrayQueue;
 /**
  * Blocking queue optimized for multiple producers and single consumer.
  */
-public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQueue<T> {
+public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQueue<T>, BatchedBlockingQueue<T> {
 
     public BlockingMpscQueue(int size) {
         super(size);
@@ -123,6 +123,45 @@ public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQ
         return drain(c::add, maxElements);
     }
 
+    @Override
+    public void putAll(T[] a, int offset, int len) throws InterruptedException {
+        for (int i = 0; i < len; i++) {
+            put(a[offset + i]);
+        }
+    }
+
+    @Override
+    public int takeAll(T[] array) throws InterruptedException {
+        int items = 0;
+
+        T t;
+        while (items < array.length && (t = poll()) != null) {
+            array[items++] = t;
+        }
+
+        if (items == 0) {
+            array[items++] = take();
+        }
+
+        return items;
+    }
+
+    @Override
+    public int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException {
+        int items = 0;
+
+        T t;
+        while (items < array.length && (t = poll()) != null) {
+            array[items++] = t;
+        }
+
+        if (items == 0 && (t = poll(timeout, unit)) != null) {
+            array[items++] = t;
+        }
+
+        return items;
+    }
+
     /**
      * Wait strategy combined with exit condition, for draining the queue.
      */
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java
new file mode 100644
index 0000000000..20e2f3723f
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test the growable array blocking queue.
+ */
+public class BatchedArrayBlockingQueueTest {
+
+    @Test
+    public void simple() throws Exception {
+        BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll());
+
+        assertEquals(4, queue.remainingCapacity());
+
+        try {
+            queue.element();
+            fail("Should have thrown exception");
+        } catch (NoSuchElementException e) {
+            // Expected
+        }
+
+        try {
+            queue.iterator();
+            fail("Should have thrown exception");
+        } catch (UnsupportedOperationException e) {
+            // Expected
+        }
+
+        // Test index rollover
+        for (int i = 0; i < 100; i++) {
+            queue.add(i);
+
+            assertEquals(i, queue.take().intValue());
+        }
+
+        queue.offer(1);
+        queue.offer(2);
+        queue.offer(3);
+        queue.offer(4);
+
+        assertEquals(4, queue.size());
+
+        List<Integer> list = new ArrayList<>();
+        queue.drainTo(list, 3);
+
+        assertEquals(1, queue.size());
+        assertEquals(Lists.newArrayList(1, 2, 3), list);
+        assertEquals(4, queue.peek().intValue());
+
+        assertEquals(4, queue.element().intValue());
+        assertEquals(4, queue.remove().intValue());
+        try {
+            queue.remove();
+            fail("Should have thrown exception");
+        } catch (NoSuchElementException e) {
+            // Expected
+        }
+    }
+
+    @Test
+    public void blockingTake() throws Exception {
+        BlockingQueue<Integer> queue = new GrowableMpScArrayConsumerBlockingQueue<>();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                int expected = 0;
+
+                for (int i = 0; i < 100; i++) {
+                    int n = queue.take();
+
+                    assertEquals(expected++, n);
+                }
+
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        int n = 0;
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 10; j++) {
+                queue.put(n);
+                ++n;
+            }
+
+            // Wait until all the entries are consumed
+            while (!queue.isEmpty()) {
+                Thread.sleep(1);
+            }
+        }
+
+        latch.await();
+    }
+
+    @Test
+    public void blockWhenFull() throws Exception {
+        BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll());
+
+        assertTrue(queue.offer(1));
+        assertTrue(queue.offer(2));
+        assertTrue(queue.offer(3));
+        assertTrue(queue.offer(4));
+        assertFalse(queue.offer(5));
+
+        assertEquals(4, queue.size());
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                queue.put(5);
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertEquals(1, latch.getCount());
+
+        assertEquals(1, (int) queue.poll());
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertEquals(4, queue.size());
+
+
+        queue.clear();
+        assertEquals(0, queue.size());
+
+        assertTrue(queue.offer(1, 1, TimeUnit.SECONDS));
+        assertTrue(queue.offer(2, 1, TimeUnit.SECONDS));
+        assertTrue(queue.offer(3, 1, TimeUnit.SECONDS));
+        assertEquals(3, queue.size());
+
+        List<Integer> list = new ArrayList<>();
+        queue.drainTo(list);
+        assertEquals(0, queue.size());
+
+        assertEquals(Lists.newArrayList(1, 2, 3), list);
+    }
+
+    @Test
+    public void pollTimeout() throws Exception {
+        BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+        assertEquals(null, queue.poll(1, TimeUnit.MILLISECONDS));
+
+        queue.put(1);
+        assertEquals(1, queue.poll(1, TimeUnit.MILLISECONDS).intValue());
+
+        // 0 timeout should not block
+        assertEquals(null, queue.poll(0, TimeUnit.HOURS));
+
+        queue.put(2);
+        queue.put(3);
+        assertEquals(2, queue.poll(1, TimeUnit.HOURS).intValue());
+        assertEquals(3, queue.poll(1, TimeUnit.HOURS).intValue());
+    }
+
+    @Test
+    public void pollTimeout2() throws Exception {
+        BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(10);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                queue.poll(1, TimeUnit.HOURS);
+
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        // Make sure background thread is waiting on poll
+        Thread.sleep(100);
+        queue.put(1);
+
+        latch.await();
+    }
+
+
+    @Test
+    public void drainToArray() throws Exception {
+        BatchedArrayBlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(100);
+
+        for (int i = 0; i < 10; i++) {
+            queue.add(i);
+        }
+
+        Integer[] local = new Integer[5];
+        int items = queue.takeAll(local);
+        assertEquals(5, items);
+        for (int i = 0; i < items; i++) {
+            assertEquals(i, (int) local[i]);
+        }
+
+        assertEquals(5, queue.size());
+
+        items = queue.takeAll(local);
+        assertEquals(5, items);
+        for (int i = 0; i < items; i++) {
+            assertEquals(i + 5, (int) local[i]);
+        }
+
+        assertEquals(0, queue.size());
+
+        /// Block when empty
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                int c = queue.takeAll(local);
+                assertEquals(1, c);
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertEquals(1, latch.getCount());
+
+        assertEquals(0, queue.size());
+
+        // Unblock the drain
+        queue.put(1);
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertEquals(0, queue.size());
+    }
+
+    @Test
+    public void putAll() throws Exception {
+        BatchedArrayBlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(10);
+
+        Integer[] items = new Integer[100];
+        for (int i = 0; i < 100; i++) {
+            items[i] = i;
+        }
+
+        queue.putAll(items, 0, 5);
+        assertEquals(5, queue.size());
+        queue.putAll(items, 0, 5);
+        assertEquals(10, queue.size());
+
+        queue.clear();
+
+        /// Block when empty
+        CountDownLatch latch = new CountDownLatch(1);
+
+        new Thread(() -> {
+            try {
+                queue.putAll(items, 0, 11);
+                latch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertEquals(1, latch.getCount());
+        assertEquals(10, queue.size());
+
+        // Unblock the putAll
+        queue.take();
+
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertEquals(10, queue.size());
+    }
+}
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java
new file mode 100644
index 0000000000..306140c2a7
--- /dev/null
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common;
+
+import java.util.ArrayList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Microbenchmarks for different executors providers.
+ */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Mode.Throughput)
+@Threads(16)
+@Fork(1)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+public class MpScQueueBenchmark {
+
+    private static final int QUEUE_SIZE = 100_000;
+
+    /**
+     * State holder of the test.
+     */
+    @State(Scope.Benchmark)
+    public static class TestState {
+
+        private ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+
+        private BatchedArrayBlockingQueue batchedArrayBlockingQueue = new BatchedArrayBlockingQueue<>(QUEUE_SIZE);
+
+        private final Integer[] batchArray = new Integer[1000];
+
+        private final ExecutorService executor = Executors.newCachedThreadPool();
+
+        @Setup(Level.Trial)
+        public void setup() {
+            for (int i = 0; i < 1000; i++) {
+                batchArray[i] = i;
+            }
+
+            executor.execute(this::consumeABQ);
+            executor.execute(this::consumeBAABQ);
+        }
+
+        @SneakyThrows
+        private void consumeABQ() {
+            ArrayList<Integer> localList = new ArrayList<>();
+
+            try {
+                while (true) {
+                    arrayBlockingQueue.drainTo(localList);
+                    if (localList.isEmpty()) {
+                        arrayBlockingQueue.take();
+                    }
+                    localList.clear();
+                }
+            } catch (InterruptedException ie) {
+            }
+        }
+
+        @SneakyThrows
+        private void consumeBAABQ() {
+            Integer[] localArray = new Integer[20_000];
+
+            try {
+                while (true) {
+                    batchedArrayBlockingQueue.takeAll(localArray);
+                }
+            } catch (InterruptedException ie) {
+            }
+        }
+
+        @TearDown(Level.Trial)
+        public void teardown() {
+            executor.shutdownNow();
+        }
+
+        @TearDown(Level.Iteration)
+        public void cleanupQueue() throws InterruptedException{
+            Thread.sleep(1_000);
+        }
+    }
+
+    @Benchmark
+    public void arrayBlockingQueue(TestState s) throws Exception {
+        s.arrayBlockingQueue.put(1);
+    }
+
+    @Benchmark
+    public void batchAwareArrayBlockingQueueSingleEnqueue(TestState s) throws Exception {
+        s.batchedArrayBlockingQueue.put(1);
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(1000)
+    public void batchAwareArrayBlockingQueueBatch(TestState s) throws Exception {
+        s.batchedArrayBlockingQueue.putAll(s.batchArray, 0, 1000);
+    }
+}