You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/11/04 19:09:42 UTC

git commit: KAFKA-1706; Add a byte bounded blocking queue utility; reviewed by Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk 58e3f99e2 -> 4bb020212


KAFKA-1706; Add a byte bounded blocking queue utility; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4bb02021
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4bb02021
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4bb02021

Branch: refs/heads/trunk
Commit: 4bb020212aa0260a750b69dc18856cf25c1e7011
Parents: 58e3f99
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Nov 4 10:09:22 2014 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Nov 4 10:09:22 2014 -0800

----------------------------------------------------------------------
 .../kafka/utils/ByteBoundedBlockingQueue.scala  | 219 +++++++++++++++++++
 .../utils/ByteBoundedBlockingQueueTest.scala    |  99 +++++++++
 2 files changed, 318 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4bb02021/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
new file mode 100644
index 0000000..6a85d7e
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
+
+/**
+ * A blocking queue that have size limits on both number of elements and number of bytes.
+ */
+class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int])
+    extends Iterable[E] {
+  private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity)
+  private var currentByteSize = new AtomicInteger()
+  private val putLock = new Object
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#offer]]
+   * An element can be enqueued provided the current size (in number of elements) is within the configured
+   * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the
+   * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity.
+   * @param e the element to put into the queue
+   * @param timeout the amount of time to wait before the expire the operation
+   * @param unit the time unit of timeout parameter, default to millisecond
+   * @return true if the element is put into queue, false if it is not
+   * @throws NullPointerException if element is null
+   * @throws InterruptedException if interrupted during waiting
+   */
+  def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = {
+    if (e == null) throw new NullPointerException("Putting null element into queue.")
+    val startTime = SystemTime.nanoseconds
+    val expireTime = startTime + unit.toNanos(timeout)
+    putLock synchronized {
+      var timeoutNanos = expireTime - SystemTime.nanoseconds
+      while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) {
+        // ensure that timeoutNanos > 0, otherwise (per javadoc) we have to wait until the next notify
+        putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt)
+        timeoutNanos = expireTime - SystemTime.nanoseconds
+      }
+      // only proceed if queue has capacity and not timeout
+      timeoutNanos = expireTime - SystemTime.nanoseconds
+      if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) {
+        val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS)
+        // only increase queue byte size if put succeeds
+        if (success)
+          currentByteSize.addAndGet(sizeFunction.get(e))
+        // wake up another thread in case multiple threads are waiting
+        if (currentByteSize.get() < queueByteCapacity)
+          putLock.notify()
+        success
+      } else {
+        false
+      }
+    }
+  }
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#offer]].
+   * Put an element to the tail of the queue, return false immediately if queue is full
+   * @param e The element to put into queue
+   * @return true on succeed, false on failure
+   * @throws NullPointerException if element is null
+   * @throws InterruptedException if interrupted during waiting
+   */
+  def offer(e: E): Boolean = {
+    if (e == null) throw new NullPointerException("Putting null element into queue.")
+    putLock synchronized {
+      if (currentByteSize.get() >= queueByteCapacity) {
+        false
+      } else {
+        val success = queue.offer(e)
+        if (success)
+          currentByteSize.addAndGet(sizeFunction.get(e))
+        // wake up another thread in case multiple threads are waiting
+        if (currentByteSize.get() < queueByteCapacity)
+          putLock.notify()
+        success
+      }
+    }
+  }
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#put]].
+   * Put an element to the tail of the queue, block if queue is full
+   * @param e The element to put into queue
+   * @return true on succeed, false on failure
+   * @throws NullPointerException if element is null
+   * @throws InterruptedException if interrupted during waiting
+   */
+  def put(e: E): Boolean = {
+    if (e == null) throw new NullPointerException("Putting null element into queue.")
+    putLock synchronized {
+      if (currentByteSize.get() >= queueByteCapacity)
+        putLock.wait()
+      val success = queue.offer(e)
+      if (success)
+        currentByteSize.addAndGet(sizeFunction.get(e))
+      // wake up another thread in case multiple threads are waiting
+      if (currentByteSize.get() < queueByteCapacity)
+        putLock.notify()
+      success
+    }
+  }
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
+   * Get an element from the head of queue. Wait for some time if the queue is empty.
+   * @param timeout the amount of time to wait if the queue is empty
+   * @param unit the unit type
+   * @return the first element in the queue, null if queue is empty
+   */
+  def poll(timeout: Long, unit: TimeUnit): E = {
+    val e = queue.poll(timeout, unit)
+    // only wake up waiting threads if the queue size drop under queueByteCapacity
+    if (e != null &&
+        currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
+        currentByteSize.get() < queueByteCapacity)
+      putLock.synchronized(putLock.notify())
+    e
+  }
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
+   * Get an element from the head of queue.
+   * @return the first element in the queue, null if queue is empty
+   */
+  def poll(): E = {
+    val e = queue.poll()
+    // only wake up waiting threads if the queue size drop under queueByteCapacity
+    if (e != null &&
+      currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
+      currentByteSize.get() < queueByteCapacity)
+      putLock.synchronized(putLock.notify())
+    e
+  }
+
+  /**
+   * Please refer to [[java.util.concurrent.BlockingQueue#take]]
+   * Get an element from the head of the queue, block if the queue is empty
+   * @return the first element in the queue, null if queue is empty
+   */
+  def take(): E = {
+    val e = queue.take()
+    // only wake up waiting threads if the queue size drop under queueByteCapacity
+    if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
+        currentByteSize.get() < queueByteCapacity)
+      putLock.synchronized(putLock.notify())
+    e
+  }
+
+  /**
+   * Iterator for the queue
+   * @return Iterator for the queue
+   */
+  override def iterator = new Iterator[E] () {
+    private val iter = queue.iterator()
+    private var curr: E = null.asInstanceOf[E]
+
+    def hasNext: Boolean = iter.hasNext
+
+    def next(): E = {
+      curr = iter.next()
+      curr
+    }
+
+    def remove() {
+      if (curr == null)
+        throw new IllegalStateException("Iterator does not have a current element.")
+      iter.remove()
+      if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity)
+        putLock.synchronized(putLock.notify())
+    }
+  }
+
+  /**
+   * get the number of elements in the queue
+   * @return number of elements in the queue
+   */
+  override def size() = queue.size()
+
+  /**
+   * get the current byte size in the queue
+   * @return current queue size in bytes
+   */
+  def byteSize() = {
+    val currSize = currentByteSize.get()
+    // There is a potential race where after an element is put into the queue and before the size is added to
+    // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize,
+    // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0.
+    if (currSize > 0) currSize else 0
+  }
+
+  /**
+   * get the number of unused slots in the queue
+   * @return the number of unused slots in the queue
+   */
+  def remainingSize = queue.remainingCapacity()
+
+  /**
+   * get the remaining bytes capacity of the queue
+   * @return the remaining bytes capacity of the queue
+   */
+  def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get())
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4bb02021/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
new file mode 100644
index 0000000..fe8d2ae
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils
+
+import java.util.concurrent.TimeUnit
+
+import junit.framework.Assert._
+import org.junit.{Test}
+import kafka.utils.ByteBoundedBlockingQueue
+
+class ByteBoundedBlockingQueueTest {
+  val sizeFunction = (a: String) => a.length
+  val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction))
+
+  @Test
+  def testByteBoundedBlockingQueue() {
+    assertEquals(5, queue.remainingSize)
+    assertEquals(15, queue.remainingByteSize)
+
+    //offer a message whose size is smaller than remaining capacity
+    val m0 = new String("0123456789")
+    assertEquals(true, queue.offer(m0))
+    assertEquals(1, queue.size())
+    assertEquals(10, queue.byteSize())
+    assertEquals(4, queue.remainingSize)
+    assertEquals(5, queue.remainingByteSize)
+
+    // offer a message where remaining capacity < message size < capacity limit
+    val m1 = new String("1234567890")
+    assertEquals(true, queue.offer(m1))
+    assertEquals(2, queue.size())
+    assertEquals(20, queue.byteSize())
+    assertEquals(3, queue.remainingSize)
+    assertEquals(0, queue.remainingByteSize)
+
+    // offer a message using timeout, should fail because no space is left
+    val m2 = new String("2345678901")
+    assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS))
+    assertEquals(2, queue.size())
+    assertEquals(20, queue.byteSize())
+    assertEquals(3, queue.remainingSize)
+    assertEquals(0, queue.remainingByteSize)
+
+    // take an element out of the queue
+    assertEquals("0123456789", queue.take())
+    assertEquals(1, queue.size())
+    assertEquals(10, queue.byteSize())
+    assertEquals(4, queue.remainingSize)
+    assertEquals(5, queue.remainingByteSize)
+
+    // add 5 small elements into the queue, first 4 should succeed, the 5th one should fail
+    // test put()
+    assertEquals(true, queue.put("a"))
+    assertEquals(true, queue.offer("b"))
+    assertEquals(true, queue.offer("c"))
+    assertEquals(4, queue.size())
+    assertEquals(13, queue.byteSize())
+    assertEquals(1, queue.remainingSize)
+    assertEquals(2, queue.remainingByteSize)
+
+    assertEquals(true, queue.offer("d"))
+    assertEquals(5, queue.size())
+    assertEquals(14, queue.byteSize())
+    assertEquals(0, queue.remainingSize)
+    assertEquals(1, queue.remainingByteSize)
+
+    assertEquals(false, queue.offer("e"))
+    assertEquals(5, queue.size())
+    assertEquals(14, queue.byteSize())
+    assertEquals(0, queue.remainingSize)
+    assertEquals(1, queue.remainingByteSize)
+
+    // try take 6 elements out of the queue, the last poll() should fail as there is no element anymore
+    // test take()
+    assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS))
+    // test poll
+    assertEquals("a", queue.poll())
+    assertEquals("b", queue.poll())
+    assertEquals("c", queue.poll())
+    assertEquals("d", queue.poll())
+    assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS))
+  }
+
+}