You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/16 17:51:30 UTC
kafka git commit: MINOR: Remove unused `ByteBoundedBlockingQueue`
class and `zkSessionTimeout` parameter
Repository: kafka
Updated Branches:
refs/heads/trunk 31203efcb -> b902ef985
MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2136 from ijuma/remove-unused-code
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b902ef98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b902ef98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b902ef98
Branch: refs/heads/trunk
Commit: b902ef985a8d4ce304950fcb2c02499054fe6d24
Parents: 31203ef
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Nov 16 09:38:42 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 16 09:38:42 2016 -0800
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 5 +-
.../kafka/utils/ByteBoundedBlockingQueue.scala | 230 -------------------
.../ControlledShutdownLeaderSelectorTest.scala | 2 +-
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../utils/ByteBoundedBlockingQueueTest.scala | 98 --------
5 files changed, 4 insertions(+), 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 730f07c..2a6f61c 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -46,8 +46,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.server._
import kafka.common.TopicAndPartition
-class ControllerContext(val zkUtils: ZkUtils,
- val zkSessionTimeout: Int) {
+class ControllerContext(val zkUtils: ZkUtils) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
@@ -157,7 +156,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
- val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
+ val controllerContext = new ControllerContext(zkUtils)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
deleted file mode 100644
index 26149af..0000000
--- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
-
-/**
- * A blocking queue that have size limits on both number of elements and number of bytes.
- */
-class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int])
- extends Iterable[E] {
- private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity)
- private var currentByteSize = new AtomicInteger()
- private val putLock = new Object
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#offer]]
- * An element can be enqueued provided the current size (in number of elements) is within the configured
- * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the
- * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity.
- * @param e the element to put into the queue
- * @param timeout the amount of time to wait before the expire the operation
- * @param unit the time unit of timeout parameter, default to millisecond
- * @return true if the element is put into queue, false if it is not
- * @throws NullPointerException if element is null
- * @throws InterruptedException if interrupted during waiting
- */
- def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = {
- if (e == null) throw new NullPointerException("Putting null element into queue.")
- val startTime = SystemTime.nanoseconds
- val expireTime = startTime + unit.toNanos(timeout)
- putLock synchronized {
- var timeoutNanos = expireTime - SystemTime.nanoseconds
- while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) {
- // ensure that timeoutNanos > 0, otherwise (per javadoc) we have to wait until the next notify
- putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt)
- timeoutNanos = expireTime - SystemTime.nanoseconds
- }
- // only proceed if queue has capacity and not timeout
- timeoutNanos = expireTime - SystemTime.nanoseconds
- if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) {
- val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS)
- // only increase queue byte size if put succeeds
- if (success)
- currentByteSize.addAndGet(sizeFunction.get(e))
- // wake up another thread in case multiple threads are waiting
- if (currentByteSize.get() < queueByteCapacity)
- putLock.notify()
- success
- } else {
- false
- }
- }
- }
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#offer]].
- * Put an element to the tail of the queue, return false immediately if queue is full
- * @param e The element to put into queue
- * @return true on succeed, false on failure
- * @throws NullPointerException if element is null
- * @throws InterruptedException if interrupted during waiting
- */
- def offer(e: E): Boolean = {
- if (e == null) throw new NullPointerException("Putting null element into queue.")
- putLock synchronized {
- if (currentByteSize.get() >= queueByteCapacity) {
- false
- } else {
- val success = queue.offer(e)
- if (success)
- currentByteSize.addAndGet(sizeFunction.get(e))
- // wake up another thread in case multiple threads are waiting
- if (currentByteSize.get() < queueByteCapacity)
- putLock.notify()
- success
- }
- }
- }
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#put]].
- * Put an element to the tail of the queue, block if queue is full
- * @param e The element to put into queue
- * @return true on succeed, false on failure
- * @throws NullPointerException if element is null
- * @throws InterruptedException if interrupted during waiting
- */
- def put(e: E): Boolean = {
- if (e == null) throw new NullPointerException("Putting null element into queue.")
- putLock synchronized {
- if (currentByteSize.get() >= queueByteCapacity)
- putLock.wait()
- val success = queue.offer(e)
- if (success)
- currentByteSize.addAndGet(sizeFunction.get(e))
- // wake up another thread in case multiple threads are waiting
- if (currentByteSize.get() < queueByteCapacity)
- putLock.notify()
- success
- }
- }
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
- * Get an element from the head of queue. Wait for some time if the queue is empty.
- * @param timeout the amount of time to wait if the queue is empty
- * @param unit the unit type
- * @return the first element in the queue, null if queue is empty
- */
- def poll(timeout: Long, unit: TimeUnit): E = {
- val e = queue.poll(timeout, unit)
- // only wake up waiting threads if the queue size drop under queueByteCapacity
- if (e != null &&
- currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
- currentByteSize.get() < queueByteCapacity)
- putLock.synchronized(putLock.notify())
- e
- }
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#poll]]
- * Get an element from the head of queue.
- * @return the first element in the queue, null if queue is empty
- */
- def poll(): E = {
- val e = queue.poll()
- // only wake up waiting threads if the queue size drop under queueByteCapacity
- if (e != null &&
- currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
- currentByteSize.get() < queueByteCapacity)
- putLock.synchronized(putLock.notify())
- e
- }
-
- /**
- * Please refer to [[java.util.concurrent.BlockingQueue#take]]
- * Get an element from the head of the queue, block if the queue is empty
- * @return the first element in the queue, null if queue is empty
- */
- def take(): E = {
- val e = queue.take()
- // only wake up waiting threads if the queue size drop under queueByteCapacity
- if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity &&
- currentByteSize.get() < queueByteCapacity)
- putLock.synchronized(putLock.notify())
- e
- }
-
- /**
- * Iterator for the queue
- * @return Iterator for the queue
- */
- override def iterator = new Iterator[E] () {
- private val iter = queue.iterator()
- private var curr: E = null.asInstanceOf[E]
-
- def hasNext: Boolean = iter.hasNext
-
- def next(): E = {
- curr = iter.next()
- curr
- }
-
- def remove() {
- if (curr == null)
- throw new IllegalStateException("Iterator does not have a current element.")
- iter.remove()
- if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity)
- putLock.synchronized(putLock.notify())
- }
- }
-
- /**
- * get the number of elements in the queue
- * @return number of elements in the queue
- */
- override def size() = queue.size()
-
- /**
- * get the current byte size in the queue
- * @return current queue size in bytes
- */
- def byteSize() = {
- val currSize = currentByteSize.get()
- // There is a potential race where after an element is put into the queue and before the size is added to
- // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize,
- // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0.
- if (currSize > 0) currSize else 0
- }
-
- /**
- * get the number of unused slots in the queue
- * @return the number of unused slots in the queue
- */
- def remainingSize = queue.remainingCapacity()
-
- /**
- * get the remaining bytes capacity of the queue
- * @return the remaining bytes capacity of the queue
- */
- def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get())
-
- /**
- * remove all the items in the queue
- */
- def clear() {
- putLock synchronized {
- queue.clear()
- currentByteSize.set(0)
- putLock.notify()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
index f032eb6..d3dbfe2 100644
--- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -39,7 +39,7 @@ class ControlledShutdownLeaderSelectorTest {
val firstLeader = 1
val zkUtils = EasyMock.mock(classOf[ZkUtils])
- val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
+ val controllerContext = new ControllerContext(zkUtils)
controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 5726152..e3f0ad2 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -131,7 +131,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
- val controllerContext = new ControllerContext(zkUtils, 6000)
+ val controllerContext = new ControllerContext(zkUtils)
controllerContext.liveBrokers = brokers.toSet
val metrics = new Metrics
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
deleted file mode 100644
index 4a070bd..0000000
--- a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.TimeUnit
-
-import org.junit.Assert._
-import org.junit.Test
-
-class ByteBoundedBlockingQueueTest {
- val sizeFunction = (a: String) => a.length
- val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction))
-
- @Test
- def testByteBoundedBlockingQueue() {
- assertEquals(5, queue.remainingSize)
- assertEquals(15, queue.remainingByteSize)
-
- //offer a message whose size is smaller than remaining capacity
- val m0 = new String("0123456789")
- assertEquals(true, queue.offer(m0))
- assertEquals(1, queue.size())
- assertEquals(10, queue.byteSize())
- assertEquals(4, queue.remainingSize)
- assertEquals(5, queue.remainingByteSize)
-
- // offer a message where remaining capacity < message size < capacity limit
- val m1 = new String("1234567890")
- assertEquals(true, queue.offer(m1))
- assertEquals(2, queue.size())
- assertEquals(20, queue.byteSize())
- assertEquals(3, queue.remainingSize)
- assertEquals(0, queue.remainingByteSize)
-
- // offer a message using timeout, should fail because no space is left
- val m2 = new String("2345678901")
- assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS))
- assertEquals(2, queue.size())
- assertEquals(20, queue.byteSize())
- assertEquals(3, queue.remainingSize)
- assertEquals(0, queue.remainingByteSize)
-
- // take an element out of the queue
- assertEquals("0123456789", queue.take())
- assertEquals(1, queue.size())
- assertEquals(10, queue.byteSize())
- assertEquals(4, queue.remainingSize)
- assertEquals(5, queue.remainingByteSize)
-
- // add 5 small elements into the queue, first 4 should succeed, the 5th one should fail
- // test put()
- assertEquals(true, queue.put("a"))
- assertEquals(true, queue.offer("b"))
- assertEquals(true, queue.offer("c"))
- assertEquals(4, queue.size())
- assertEquals(13, queue.byteSize())
- assertEquals(1, queue.remainingSize)
- assertEquals(2, queue.remainingByteSize)
-
- assertEquals(true, queue.offer("d"))
- assertEquals(5, queue.size())
- assertEquals(14, queue.byteSize())
- assertEquals(0, queue.remainingSize)
- assertEquals(1, queue.remainingByteSize)
-
- assertEquals(false, queue.offer("e"))
- assertEquals(5, queue.size())
- assertEquals(14, queue.byteSize())
- assertEquals(0, queue.remainingSize)
- assertEquals(1, queue.remainingByteSize)
-
- // try take 6 elements out of the queue, the last poll() should fail as there is no element anymore
- // test take()
- assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS))
- // test poll
- assertEquals("a", queue.poll())
- assertEquals("b", queue.poll())
- assertEquals("c", queue.poll())
- assertEquals("d", queue.poll())
- assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS))
- }
-
-}