You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2023/03/06 14:35:20 UTC
[bookkeeper] branch master updated: Added BatchedArrayBlockingQueue (#3838)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 73c5a0e011 Added BatchedArrayBlockingQueue (#3838)
73c5a0e011 is described below
commit 73c5a0e0111754690e5cd74fb030ee64a2f829d9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 6 06:35:11 2023 -0800
Added BatchedArrayBlockingQueue (#3838)
---
.../collections/BatchedArrayBlockingQueue.java | 409 +++++++++++++++++++++
.../common/collections/BatchedBlockingQueue.java | 55 +++
.../common/collections/BlockingMpscQueue.java | 41 ++-
.../collections/BatchedArrayBlockingQueueTest.java | 312 ++++++++++++++++
.../bookkeeper/common/MpScQueueBenchmark.java | 134 +++++++
5 files changed, 950 insertions(+), 1 deletion(-)
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java
new file mode 100644
index 0000000000..646391b49e
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueue.java
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.AbstractQueue;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This implements a {@link BlockingQueue} backed by an array with fixed capacity.
+ *
+ * <p>This queue only allows 1 consumer thread to dequeue items and multiple producer threads.
+ */
+public class BatchedArrayBlockingQueue<T>
+ extends AbstractQueue<T>
+ implements BlockingQueue<T>, BatchedBlockingQueue<T> {
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
+
+ private final int capacity;
+ private final T[] data;
+
+ private int size;
+
+ private int consumerIdx;
+ private int producerIdx;
+
+ @SuppressWarnings("unchecked")
+ public BatchedArrayBlockingQueue(int capacity) {
+ this.capacity = capacity;
+ this.data = (T[]) new Object[this.capacity];
+ }
+
+ private T dequeueOne() {
+ T item = data[consumerIdx];
+ data[consumerIdx] = null;
+ if (++consumerIdx == capacity) {
+ consumerIdx = 0;
+ }
+
+ if (size-- == capacity) {
+ notFull.signalAll();
+ }
+
+ return item;
+ }
+
+ private void enqueueOne(T item) {
+ data[producerIdx] = item;
+ if (++producerIdx == capacity) {
+ producerIdx = 0;
+ }
+
+ if (size++ == 0) {
+ notEmpty.signalAll();
+ }
+ }
+
+ @Override
+ public T poll() {
+ lock.lock();
+
+ try {
+ if (size == 0) {
+ return null;
+ }
+
+ return dequeueOne();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public T peek() {
+ lock.lock();
+
+ try {
+ if (size == 0) {
+ return null;
+ }
+
+ return data[consumerIdx];
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean offer(T e) {
+ lock.lock();
+
+ try {
+ if (size == capacity) {
+ return false;
+ }
+
+ enqueueOne(e);
+
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void put(T e) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (size == capacity) {
+ notFull.await();
+ }
+
+ enqueueOne(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int putAll(List<T> c) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (size == capacity) {
+ notFull.await();
+ }
+
+ int availableCapacity = capacity - size;
+
+ int toInsert = Math.min(availableCapacity, c.size());
+
+ int producerIdx = this.producerIdx;
+ for (int i = 0; i < toInsert; i++) {
+ data[producerIdx] = c.get(i);
+ if (++producerIdx == capacity) {
+ producerIdx = 0;
+ }
+ }
+
+ this.producerIdx = producerIdx;
+
+ if (size == 0) {
+ notEmpty.signalAll();
+ }
+
+ size += toInsert;
+
+ return toInsert;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void putAll(T[] a, int offset, int len) throws InterruptedException {
+ while (len > 0) {
+ int published = internalPutAll(a, offset, len);
+ offset += published;
+ len -= published;
+ }
+ }
+
+ private int internalPutAll(T[] a, int offset, int len) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (size == capacity) {
+ notFull.await();
+ }
+
+ int availableCapacity = capacity - size;
+ int toInsert = Math.min(availableCapacity, len);
+ int producerIdx = this.producerIdx;
+
+ // First span
+ int firstSpan = Math.min(toInsert, capacity - producerIdx);
+ System.arraycopy(a, offset, data, producerIdx, firstSpan);
+ producerIdx += firstSpan;
+
+ int secondSpan = toInsert - firstSpan;
+ if (secondSpan > 0) {
+ System.arraycopy(a, offset + firstSpan, data, 0, secondSpan);
+ producerIdx = secondSpan;
+ }
+
+ if (producerIdx == capacity) {
+ producerIdx = 0;
+ }
+
+ this.producerIdx = producerIdx;
+
+ if (size == 0) {
+ notEmpty.signalAll();
+ }
+
+ size += toInsert;
+ return toInsert;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ long remainingTimeNanos = unit.toNanos(timeout);
+
+ lock.lockInterruptibly();
+ try {
+ while (size == capacity) {
+ if (remainingTimeNanos <= 0L) {
+ return false;
+ }
+
+ remainingTimeNanos = notFull.awaitNanos(remainingTimeNanos);
+ }
+
+ enqueueOne(e);
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public T take() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (size == 0) {
+ notEmpty.await();
+ }
+
+ return dequeueOne();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long remainingTimeNanos = unit.toNanos(timeout);
+
+ lock.lockInterruptibly();
+ try {
+ while (size == 0) {
+ if (remainingTimeNanos <= 0L) {
+ return null;
+ }
+
+ remainingTimeNanos = notEmpty.awaitNanos(remainingTimeNanos);
+ }
+
+ return dequeueOne();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return capacity - size;
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c) {
+ return drainTo(c, capacity);
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ lock.lock();
+ try {
+ int toDrain = Math.min(size, maxElements);
+
+ int consumerIdx = this.consumerIdx;
+ for (int i = 0; i < toDrain; i++) {
+ T item = data[consumerIdx];
+ data[consumerIdx] = null;
+ c.add(item);
+
+ if (++consumerIdx == capacity) {
+ consumerIdx = 0;
+ }
+ }
+
+ this.consumerIdx = consumerIdx;
+ if (size == capacity) {
+ notFull.signalAll();
+ }
+
+ size -= toDrain;
+ return toDrain;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int takeAll(T[] array) throws InterruptedException {
+ return internalTakeAll(array, true, 0, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException {
+ return internalTakeAll(array, false, timeout, unit);
+ }
+
+ private int internalTakeAll(T[] array, boolean waitForever, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ while (size == 0) {
+ if (waitForever) {
+ notEmpty.await();
+ } else {
+ if (!notEmpty.await(timeout, unit)) {
+ return 0;
+ }
+ }
+ }
+
+ int toDrain = Math.min(size, array.length);
+
+ int consumerIdx = this.consumerIdx;
+
+ // First span
+ int firstSpan = Math.min(toDrain, capacity - consumerIdx);
+ System.arraycopy(data, consumerIdx, array, 0, firstSpan);
+ Arrays.fill(data, consumerIdx, consumerIdx + firstSpan, null);
+ consumerIdx += firstSpan;
+
+ int secondSpan = toDrain - firstSpan;
+ if (secondSpan > 0) {
+ System.arraycopy(data, 0, array, firstSpan, secondSpan);
+ Arrays.fill(data, 0, secondSpan, null);
+ consumerIdx = secondSpan;
+ }
+
+ if (consumerIdx == capacity) {
+ consumerIdx = 0;
+ }
+ this.consumerIdx = consumerIdx;
+ if (size == capacity) {
+ notFull.signalAll();
+ }
+
+ size -= toDrain;
+ return toDrain;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void clear() {
+ lock.lock();
+ try {
+ while (size > 0) {
+ dequeueOne();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int size() {
+ lock.lock();
+
+ try {
+ return size;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java
new file mode 100644
index 0000000000..5a0e0a72ea
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BatchedBlockingQueue.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public interface BatchedBlockingQueue<T> extends BlockingQueue<T> {
+ void putAll(T[] a, int offset, int len) throws InterruptedException;
+
+ /**
+ * Drain the queue into an array.
+ * Wait if there are no items in the queue.
+ *
+ * @param array
+ * @return
+ * @throws InterruptedException
+ */
+ int takeAll(T[] array) throws InterruptedException;
+
+ /**
+ * Removes multiple items from the queue.
+ *
+ * The method returns when either:
+ * 1. At least one item is available
+ * 2. The timeout expires
+ *
+ *
+ * @param array
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ */
+ int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException;
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
index b87c8f638e..56d9627e84 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
@@ -26,7 +26,7 @@ import org.jctools.queues.MpscArrayQueue;
/**
* Blocking queue optimized for multiple producers and single consumer.
*/
-public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQueue<T> {
+public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQueue<T>, BatchedBlockingQueue<T> {
public BlockingMpscQueue(int size) {
super(size);
@@ -123,6 +123,45 @@ public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements BlockingQ
return drain(c::add, maxElements);
}
+ @Override
+ public void putAll(T[] a, int offset, int len) throws InterruptedException {
+ for (int i = 0; i < len; i++) {
+ put(a[offset + i]);
+ }
+ }
+
+ @Override
+ public int takeAll(T[] array) throws InterruptedException {
+ int items = 0;
+
+ T t;
+ while (items < array.length && (t = poll()) != null) {
+ array[items++] = t;
+ }
+
+ if (items == 0) {
+ array[items++] = take();
+ }
+
+ return items;
+ }
+
+ @Override
+ public int pollAll(T[] array, long timeout, TimeUnit unit) throws InterruptedException {
+ int items = 0;
+
+ T t;
+ while (items < array.length && (t = poll()) != null) {
+ array[items++] = t;
+ }
+
+ if (items == 0 && (t = poll(timeout, unit)) != null) {
+ array[items++] = t;
+ }
+
+ return items;
+ }
+
/**
* Wait strategy combined with exit condition, for draining the queue.
*/
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java
new file mode 100644
index 0000000000..20e2f3723f
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BatchedArrayBlockingQueueTest.java
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test the growable array blocking queue.
+ */
+public class BatchedArrayBlockingQueueTest {
+
+ @Test
+ public void simple() throws Exception {
+ BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll());
+
+ assertEquals(4, queue.remainingCapacity());
+
+ try {
+ queue.element();
+ fail("Should have thrown exception");
+ } catch (NoSuchElementException e) {
+ // Expected
+ }
+
+ try {
+ queue.iterator();
+ fail("Should have thrown exception");
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+
+ // Test index rollover
+ for (int i = 0; i < 100; i++) {
+ queue.add(i);
+
+ assertEquals(i, queue.take().intValue());
+ }
+
+ queue.offer(1);
+ queue.offer(2);
+ queue.offer(3);
+ queue.offer(4);
+
+ assertEquals(4, queue.size());
+
+ List<Integer> list = new ArrayList<>();
+ queue.drainTo(list, 3);
+
+ assertEquals(1, queue.size());
+ assertEquals(Lists.newArrayList(1, 2, 3), list);
+ assertEquals(4, queue.peek().intValue());
+
+ assertEquals(4, queue.element().intValue());
+ assertEquals(4, queue.remove().intValue());
+ try {
+ queue.remove();
+ fail("Should have thrown exception");
+ } catch (NoSuchElementException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void blockingTake() throws Exception {
+ BlockingQueue<Integer> queue = new GrowableMpScArrayConsumerBlockingQueue<>();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ int expected = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int n = queue.take();
+
+ assertEquals(expected++, n);
+ }
+
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ int n = 0;
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ queue.put(n);
+ ++n;
+ }
+
+ // Wait until all the entries are consumed
+ while (!queue.isEmpty()) {
+ Thread.sleep(1);
+ }
+ }
+
+ latch.await();
+ }
+
+ @Test
+ public void blockWhenFull() throws Exception {
+ BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll());
+
+ assertTrue(queue.offer(1));
+ assertTrue(queue.offer(2));
+ assertTrue(queue.offer(3));
+ assertTrue(queue.offer(4));
+ assertFalse(queue.offer(5));
+
+ assertEquals(4, queue.size());
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ queue.put(5);
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertEquals(1, latch.getCount());
+
+ assertEquals(1, (int) queue.poll());
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(4, queue.size());
+
+
+ queue.clear();
+ assertEquals(0, queue.size());
+
+ assertTrue(queue.offer(1, 1, TimeUnit.SECONDS));
+ assertTrue(queue.offer(2, 1, TimeUnit.SECONDS));
+ assertTrue(queue.offer(3, 1, TimeUnit.SECONDS));
+ assertEquals(3, queue.size());
+
+ List<Integer> list = new ArrayList<>();
+ queue.drainTo(list);
+ assertEquals(0, queue.size());
+
+ assertEquals(Lists.newArrayList(1, 2, 3), list);
+ }
+
+ @Test
+ public void pollTimeout() throws Exception {
+ BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(4);
+
+ assertEquals(null, queue.poll(1, TimeUnit.MILLISECONDS));
+
+ queue.put(1);
+ assertEquals(1, queue.poll(1, TimeUnit.MILLISECONDS).intValue());
+
+ // 0 timeout should not block
+ assertEquals(null, queue.poll(0, TimeUnit.HOURS));
+
+ queue.put(2);
+ queue.put(3);
+ assertEquals(2, queue.poll(1, TimeUnit.HOURS).intValue());
+ assertEquals(3, queue.poll(1, TimeUnit.HOURS).intValue());
+ }
+
+ @Test
+ public void pollTimeout2() throws Exception {
+ BlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(10);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ queue.poll(1, TimeUnit.HOURS);
+
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ // Make sure background thread is waiting on poll
+ Thread.sleep(100);
+ queue.put(1);
+
+ latch.await();
+ }
+
+
+ @Test
+ public void drainToArray() throws Exception {
+ BatchedArrayBlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(100);
+
+ for (int i = 0; i < 10; i++) {
+ queue.add(i);
+ }
+
+ Integer[] local = new Integer[5];
+ int items = queue.takeAll(local);
+ assertEquals(5, items);
+ for (int i = 0; i < items; i++) {
+ assertEquals(i, (int) local[i]);
+ }
+
+ assertEquals(5, queue.size());
+
+ items = queue.takeAll(local);
+ assertEquals(5, items);
+ for (int i = 0; i < items; i++) {
+ assertEquals(i + 5, (int) local[i]);
+ }
+
+ assertEquals(0, queue.size());
+
+ /// Block when empty
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ int c = queue.takeAll(local);
+ assertEquals(1, c);
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertEquals(1, latch.getCount());
+
+ assertEquals(0, queue.size());
+
+ // Unblock the drain
+ queue.put(1);
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(0, queue.size());
+ }
+
+ @Test
+ public void putAll() throws Exception {
+ BatchedArrayBlockingQueue<Integer> queue = new BatchedArrayBlockingQueue<>(10);
+
+ Integer[] items = new Integer[100];
+ for (int i = 0; i < 100; i++) {
+ items[i] = i;
+ }
+
+ queue.putAll(items, 0, 5);
+ assertEquals(5, queue.size());
+ queue.putAll(items, 0, 5);
+ assertEquals(10, queue.size());
+
+ queue.clear();
+
+ /// Block when empty
+ CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(() -> {
+ try {
+ queue.putAll(items, 0, 11);
+ latch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertEquals(1, latch.getCount());
+ assertEquals(10, queue.size());
+
+ // Unblock the putAll
+ queue.take();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(10, queue.size());
+ }
+}
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java
new file mode 100644
index 0000000000..306140c2a7
--- /dev/null
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/MpScQueueBenchmark.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common;
+
+import java.util.ArrayList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Microbenchmarks for different executors providers.
+ */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Mode.Throughput)
+@Threads(16)
+@Fork(1)
+@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+public class MpScQueueBenchmark {
+
+ private static final int QUEUE_SIZE = 100_000;
+
+ /**
+ * State holder of the test.
+ */
+ @State(Scope.Benchmark)
+ public static class TestState {
+
+ private ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+
+ private BatchedArrayBlockingQueue batchedArrayBlockingQueue = new BatchedArrayBlockingQueue<>(QUEUE_SIZE);
+
+ private final Integer[] batchArray = new Integer[1000];
+
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+
+ @Setup(Level.Trial)
+ public void setup() {
+ for (int i = 0; i < 1000; i++) {
+ batchArray[i] = i;
+ }
+
+ executor.execute(this::consumeABQ);
+ executor.execute(this::consumeBAABQ);
+ }
+
+ @SneakyThrows
+ private void consumeABQ() {
+ ArrayList<Integer> localList = new ArrayList<>();
+
+ try {
+ while (true) {
+ arrayBlockingQueue.drainTo(localList);
+ if (localList.isEmpty()) {
+ arrayBlockingQueue.take();
+ }
+ localList.clear();
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ @SneakyThrows
+ private void consumeBAABQ() {
+ Integer[] localArray = new Integer[20_000];
+
+ try {
+ while (true) {
+ batchedArrayBlockingQueue.takeAll(localArray);
+ }
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void teardown() {
+ executor.shutdownNow();
+ }
+
+ @TearDown(Level.Iteration)
+ public void cleanupQueue() throws InterruptedException{
+ Thread.sleep(1_000);
+ }
+ }
+
+ @Benchmark
+ public void arrayBlockingQueue(TestState s) throws Exception {
+ s.arrayBlockingQueue.put(1);
+ }
+
+ @Benchmark
+ public void batchAwareArrayBlockingQueueSingleEnqueue(TestState s) throws Exception {
+ s.batchedArrayBlockingQueue.put(1);
+ }
+
+ @Benchmark
+ @OperationsPerInvocation(1000)
+ public void batchAwareArrayBlockingQueueBatch(TestState s) throws Exception {
+ s.batchedArrayBlockingQueue.putAll(s.batchArray, 0, 1000);
+ }
+}