You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/16 17:51:30 UTC

kafka git commit: MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter

Repository: kafka
Updated Branches:
  refs/heads/trunk 31203efcb -> b902ef985


MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2136 from ijuma/remove-unused-code


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b902ef98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b902ef98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b902ef98

Branch: refs/heads/trunk
Commit: b902ef985a8d4ce304950fcb2c02499054fe6d24
Parents: 31203ef
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Nov 16 09:38:42 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 16 09:38:42 2016 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |   5 +-
 .../kafka/utils/ByteBoundedBlockingQueue.scala  | 230 -------------------
 .../ControlledShutdownLeaderSelectorTest.scala  |   2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   2 +-
 .../utils/ByteBoundedBlockingQueueTest.scala    |  98 --------
 5 files changed, 4 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 730f07c..2a6f61c 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -46,8 +46,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.server._
 import kafka.common.TopicAndPartition
 
-class ControllerContext(val zkUtils: ZkUtils,
-                        val zkSessionTimeout: Int) {
+class ControllerContext(val zkUtils: ZkUtils) {
   var controllerChannelManager: ControllerChannelManager = null
   val controllerLock: ReentrantLock = new ReentrantLock()
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
@@ -157,7 +156,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
+  val controllerContext = new ControllerContext(zkUtils)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
deleted file mode 100644
index 26149af..0000000
--- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
-
-/**
- * A blocking queue that have size limits on both number of elements and number of bytes.
- */
-class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int])
-    extends Iterable[E] {
-  private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity)
-  private var currentByteSize = new AtomicInteger()
-  private val putLock = new Object
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#offer]]
-   * An element can be enqueued provided the current size (in number of elements) is within the configured
-   * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the
-   * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity.
-   * @param e the element to put into the queue
-   * @param timeout the amount of time to wait before the expire the operation
-   * @param unit the time unit of timeout parameter, default to millisecond
-   * @return true if the element is put into queue, false if it is not
-   * @throws NullPointerException if element is null
-   * @throws InterruptedException if interrupted during waiting
-   */
-  def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = {
-    if (e == null) throw new NullPointerException("Putting null element into queue.")
-    val startTime = SystemTime.nanoseconds
-    val expireTime = startTime + unit.toNanos(timeout)
-    putLock synchronized {
-      var timeoutNanos = expireTime - SystemTime.nanoseconds
-      while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) {
-        // ensure that timeoutNanos > 0, otherwise (per javadoc) we have to wait until the next notify
-        putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt)
-        timeoutNanos = expireTime - SystemTime.nanoseconds
-      }
-      // only proceed if queue has capacity and not timeout
-      timeoutNanos = expireTime - SystemTime.nanoseconds
-      if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) {
-        val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS)
-        // only increase queue byte size if put succeeds
-        if (success)
-          currentByteSize.addAndGet(sizeFunction.get(e))
-        // wake up another thread in case multiple threads are waiting
-        if (currentByteSize.get() < queueByteCapacity)
-          putLock.notify()
-        success
-      } else {
-        false
-      }
-    }
-  }
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#offer]].
-   * Put an element to the tail of the queue, return false immediately if queue is full
-   * @param e The element to put into queue
-   * @return true on succeed, false on failure
-   * @throws NullPointerException if element is null
-   * @throws InterruptedException if interrupted during waiting
-   */
-  def offer(e: E): Boolean = {
-    if (e == null) throw new NullPointerException("Putting null element into queue.")
-    putLock synchronized {
-      if (currentByteSize.get() >= queueByteCapacity) {
-        false
-      } else {
-        val success = queue.offer(e)
-        if (success)
-          currentByteSize.addAndGet(sizeFunction.get(e))
-        // wake up another thread in case multiple threads are waiting
-        if (currentByteSize.get() < queueByteCapacity)
-          putLock.notify()
-        success
-      }
-    }
-  }
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#put]].
-   * Put an element to the tail of the queue, block if queue is full
-   * @param e The element to put into queue
-   * @return true on succeed, false on failure
-   * @throws NullPointerException if element is null
-   * @throws InterruptedException if interrupted during waiting
-   */
-  def put(e: E): Boolean = {
-    if (e == null) throw new NullPointerException("Putting null element into queue.")
-    putLock synchronized {
-      if (currentByteSize.get() >= queueByteCapacity)
-        putLock.wait()
-      val success = queue.offer(e)
-      if (success)
-        currentByteSize.addAndGet(sizeFunction.get(e))
-      // wake up another thread in case multiple threads are waiting
-      if (currentByteSize.get() < queueByteCapacity)
-        putLock.notify()
-      success
-    }
-  }
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
-   * Get an element from the head of queue. Wait for some time if the queue is empty.
-   * @param timeout the amount of time to wait if the queue is empty
-   * @param unit the unit type
-   * @return the first element in the queue, null if queue is empty
-   */
-  def poll(timeout: Long, unit: TimeUnit): E = {
-    val e = queue.poll(timeout, unit)
-    // only wake up waiting threads if the queue size drop under queueByteCapacity
-    if (e != null &&
-        currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
-        currentByteSize.get() < queueByteCapacity)
-      putLock.synchronized(putLock.notify())
-    e
-  }
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
-   * Get an element from the head of queue.
-   * @return the first element in the queue, null if queue is empty
-   */
-  def poll(): E = {
-    val e = queue.poll()
-    // only wake up waiting threads if the queue size drop under queueByteCapacity
-    if (e != null &&
-      currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
-      currentByteSize.get() < queueByteCapacity)
-      putLock.synchronized(putLock.notify())
-    e
-  }
-
-  /**
-   * Please refer to [[java.util.concurrent.BlockingQueue#take]]
-   * Get an element from the head of the queue, block if the queue is empty
-   * @return the first element in the queue, null if queue is empty
-   */
-  def take(): E = {
-    val e = queue.take()
-    // only wake up waiting threads if the queue size drop under queueByteCapacity
-    if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
-        currentByteSize.get() < queueByteCapacity)
-      putLock.synchronized(putLock.notify())
-    e
-  }
-
-  /**
-   * Iterator for the queue
-   * @return Iterator for the queue
-   */
-  override def iterator = new Iterator[E] () {
-    private val iter = queue.iterator()
-    private var curr: E = null.asInstanceOf[E]
-
-    def hasNext: Boolean = iter.hasNext
-
-    def next(): E = {
-      curr = iter.next()
-      curr
-    }
-
-    def remove() {
-      if (curr == null)
-        throw new IllegalStateException("Iterator does not have a current element.")
-      iter.remove()
-      if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity)
-        putLock.synchronized(putLock.notify())
-    }
-  }
-
-  /**
-   * get the number of elements in the queue
-   * @return number of elements in the queue
-   */
-  override def size() = queue.size()
-
-  /**
-   * get the current byte size in the queue
-   * @return current queue size in bytes
-   */
-  def byteSize() = {
-    val currSize = currentByteSize.get()
-    // There is a potential race where after an element is put into the queue and before the size is added to
-    // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize,
-    // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0.
-    if (currSize > 0) currSize else 0
-  }
-
-  /**
-   * get the number of unused slots in the queue
-   * @return the number of unused slots in the queue
-   */
-  def remainingSize = queue.remainingCapacity()
-
-  /**
-   * get the remaining bytes capacity of the queue
-   * @return the remaining bytes capacity of the queue
-   */
-  def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get())
-
-  /**
-   * remove all the items in the queue
-   */
-  def clear() {
-    putLock synchronized {
-      queue.clear()
-      currentByteSize.set(0)
-      putLock.notify()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
index f032eb6..d3dbfe2 100644
--- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -39,7 +39,7 @@ class ControlledShutdownLeaderSelectorTest {
     val firstLeader = 1
 
     val zkUtils = EasyMock.mock(classOf[ZkUtils])
-    val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
+    val controllerContext = new ControllerContext(zkUtils)
     controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
     controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
     controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 5726152..e3f0ad2 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -131,7 +131,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
     val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
 
-    val controllerContext = new ControllerContext(zkUtils, 6000)
+    val controllerContext = new ControllerContext(zkUtils)
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
deleted file mode 100644
index 4a070bd..0000000
--- a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.TimeUnit
-
-import org.junit.Assert._
-import org.junit.Test
-
-class ByteBoundedBlockingQueueTest {
-  val sizeFunction = (a: String) => a.length
-  val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction))
-
-  @Test
-  def testByteBoundedBlockingQueue() {
-    assertEquals(5, queue.remainingSize)
-    assertEquals(15, queue.remainingByteSize)
-
-    //offer a message whose size is smaller than remaining capacity
-    val m0 = new String("0123456789")
-    assertEquals(true, queue.offer(m0))
-    assertEquals(1, queue.size())
-    assertEquals(10, queue.byteSize())
-    assertEquals(4, queue.remainingSize)
-    assertEquals(5, queue.remainingByteSize)
-
-    // offer a message where remaining capacity < message size < capacity limit
-    val m1 = new String("1234567890")
-    assertEquals(true, queue.offer(m1))
-    assertEquals(2, queue.size())
-    assertEquals(20, queue.byteSize())
-    assertEquals(3, queue.remainingSize)
-    assertEquals(0, queue.remainingByteSize)
-
-    // offer a message using timeout, should fail because no space is left
-    val m2 = new String("2345678901")
-    assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS))
-    assertEquals(2, queue.size())
-    assertEquals(20, queue.byteSize())
-    assertEquals(3, queue.remainingSize)
-    assertEquals(0, queue.remainingByteSize)
-
-    // take an element out of the queue
-    assertEquals("0123456789", queue.take())
-    assertEquals(1, queue.size())
-    assertEquals(10, queue.byteSize())
-    assertEquals(4, queue.remainingSize)
-    assertEquals(5, queue.remainingByteSize)
-
-    // add 5 small elements into the queue, first 4 should succeed, the 5th one should fail
-    // test put()
-    assertEquals(true, queue.put("a"))
-    assertEquals(true, queue.offer("b"))
-    assertEquals(true, queue.offer("c"))
-    assertEquals(4, queue.size())
-    assertEquals(13, queue.byteSize())
-    assertEquals(1, queue.remainingSize)
-    assertEquals(2, queue.remainingByteSize)
-
-    assertEquals(true, queue.offer("d"))
-    assertEquals(5, queue.size())
-    assertEquals(14, queue.byteSize())
-    assertEquals(0, queue.remainingSize)
-    assertEquals(1, queue.remainingByteSize)
-
-    assertEquals(false, queue.offer("e"))
-    assertEquals(5, queue.size())
-    assertEquals(14, queue.byteSize())
-    assertEquals(0, queue.remainingSize)
-    assertEquals(1, queue.remainingByteSize)
-
-    // try take 6 elements out of the queue, the last poll() should fail as there is no element anymore
-    // test take()
-    assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS))
-    // test poll
-    assertEquals("a", queue.poll())
-    assertEquals("b", queue.poll())
-    assertEquals("c", queue.poll())
-    assertEquals("d", queue.poll())
-    assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS))
-  }
-
-}