You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/25 02:17:44 UTC
[1/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
Repository: kafka
Updated Branches:
refs/heads/trunk 1fc450fdb -> d09267383
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 7ef4550..d8f0de4 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -40,8 +40,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
zkUtils.createPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
- case configException: ConfigException =>
- case exception: Throwable => fail("Should have thrown ConfigException")
+ case _: ConfigException =>
}
zkUtils.close()
}
@@ -50,13 +49,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- try {
- ZkPath.resetNamespaceCheckedState
- zkUtils.createPersistentPath(path)
- } catch {
- case exception: Throwable => fail("Failed to create persistent path")
- }
-
+ ZkPath.resetNamespaceCheckedState
+ zkUtils.createPersistentPath(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
zkUtils.close()
}
@@ -70,8 +64,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
zkUtils.makeSurePersistentPathExists(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
- case configException: ConfigException =>
- case exception: Throwable => fail("Should have thrown ConfigException")
+ case _: ConfigException =>
}
zkUtils.close()
}
@@ -80,13 +73,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- try {
- ZkPath.resetNamespaceCheckedState
- zkUtils.makeSurePersistentPathExists(path)
- } catch {
- case exception: Throwable => fail("Failed to create persistent path")
- }
-
+ ZkPath.resetNamespaceCheckedState
+ zkUtils.makeSurePersistentPathExists(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
zkUtils.close()
}
@@ -100,8 +88,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
zkUtils.createEphemeralPathExpectConflict(path, "somedata")
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
- case configException: ConfigException =>
- case exception: Throwable => fail("Should have thrown ConfigException")
+ case _: ConfigException =>
}
zkUtils.close()
}
@@ -110,13 +97,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
- try {
- ZkPath.resetNamespaceCheckedState
- zkUtils.createEphemeralPathExpectConflict(path, "somedata")
- } catch {
- case exception: Throwable => fail("Failed to create ephemeral path")
- }
-
+ ZkPath.resetNamespaceCheckedState
+ zkUtils.createEphemeralPathExpectConflict(path, "somedata")
assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path))
zkUtils.close()
}
@@ -131,8 +113,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
zkUtils.createSequentialPersistentPath(path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
- case configException: ConfigException =>
- case exception: Throwable => fail("Should have thrown ConfigException")
+ case _: ConfigException =>
}
zkUtils.close()
}
@@ -141,15 +122,8 @@ class ZKPathTest extends ZooKeeperTestHarness {
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
-
- var actualPath: String = ""
- try {
- ZkPath.resetNamespaceCheckedState
- actualPath = zkUtils.createSequentialPersistentPath(path)
- } catch {
- case exception: Throwable => fail("Failed to create persistent path")
- }
-
+ ZkPath.resetNamespaceCheckedState
+ val actualPath = zkUtils.createSequentialPersistentPath(path)
assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath))
zkUtils.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
index 6eaee70..bd0b257 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -39,7 +39,7 @@ object ZkFourLetterWords {
outStream.write("stat".getBytes)
outStream.flush()
} catch {
- case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw")
+ case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw", e)
} finally {
sock.close
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 25e0620..7dcd63d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -52,7 +52,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
private <K, V> KeyValueStore<K, V> createStore(final ProcessorContext context, final Class<K> keyClass, final Class<V> valueClass, final boolean useContextSerdes, final boolean enableCaching) {
- Stores.PersistentKeyValueFactory<?, ?> factory = null;
+ Stores.PersistentKeyValueFactory<?, ?> factory;
if (useContextSerdes) {
factory = Stores
.create("my-store")
[2/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 653b40c..b7fc657 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -52,7 +52,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp()
- topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+ topicInfos = configs.map(_ => new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(consumedOffset),
@@ -77,7 +77,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new StringDecoder(),
new StringDecoder(),
clientId = "")
- val receivedMessages = (0 until 5).map(i => iter.next.message).toList
+ val receivedMessages = (0 until 5).map(_ => iter.next.message)
assertFalse(iter.hasNext)
assertEquals(0, queue.size) // Shutdown command has been consumed.
@@ -101,17 +101,14 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new FailDecoder(),
clientId = "")
- val receivedMessages = (0 until 5).map{ i =>
+ (0 until 5).foreach { i =>
assertTrue(iter.hasNext)
val message = iter.next
assertEquals(message.offset, i + consumedOffset)
- try {
- message.message // should fail
- }
+ try message.message // should fail
catch {
- case e: UnsupportedOperationException => // this is ok
- case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+ case _: UnsupportedOperationException => // this is ok
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c..9e568f8 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -35,49 +35,49 @@ class PartitionAssignorTest extends Logging {
val assignor = new RoundRobinAssignor
/** various scenarios with only wildcard consumers */
- (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
val topicPartitionCounts = Map((1 to topicCount).map(topic => {
("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
- }).toSeq:_*)
+ }):_*)
- val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val subscriptions = Map((1 to consumerCount).map { consumer =>
val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
- }).toSeq:_*)
+ }:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
EasyMock.replay(zkUtils.zkClient)
PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
- })
+ }
}
@Test
def testRangePartitionAssignor() {
val assignor = new RangeAssignor
- (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
val topicPartitionCounts = Map((1 to topicCount).map(topic => {
("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
- }).toSeq:_*)
+ }):_*)
- val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val subscriptions = Map((1 to consumerCount).map { consumer =>
val streamCounts = Map((1 to topicCount).map(topic => {
val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
("topic-" + topic, streamCount)
- }).toSeq:_*)
+ }):_*)
("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
- }).toSeq:_*)
+ }:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
EasyMock.replay(zkUtils.zkClient)
PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
- })
+ }
}
}
@@ -163,7 +163,7 @@ private object PartitionAssignorTest extends Logging {
private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
verifyAssignmentIsUniform: Boolean = false) {
- val assignments = scenario.subscriptions.map{ case(consumer, subscription) =>
+ val assignments = scenario.subscriptions.map { case (consumer, _) =>
val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
assignor.assign(ctx).get(consumer)
}
@@ -200,7 +200,7 @@ private object PartitionAssignorTest extends Logging {
/** For each consumer stream, count the number of partitions that it owns. */
private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
- assignment.foreach { case (topicPartition, owner) =>
+ assignment.foreach { case (_, owner) =>
val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
ownedCounts.put(owner, updatedCount)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b054794..df80d1d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -81,13 +81,12 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// no messages to consume, we should hit timeout;
// also the iterator should support re-entrant, so loop it twice
- for (i <- 0 until 2) {
+ for (_ <- 0 until 2) {
try {
getMessages(topicMessageStreams0, nMessages * 2)
fail("should get an exception")
} catch {
- case e: ConsumerTimeoutException => // this is ok
- case e: Throwable => throw e
+ case _: ConsumerTimeoutException => // this is ok
}
}
@@ -147,7 +146,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val consumerConfig3 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer3))
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
- val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
@@ -164,10 +163,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// call createMesssageStreams twice should throw MessageStreamsExistException
try {
- val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
fail("Should fail with MessageStreamsExistException")
} catch {
- case e: MessageStreamsExistException => // expected
+ case _: MessageStreamsExistException => // expected
}
zkConsumerConnector1.shutdown
@@ -235,7 +234,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val consumerConfig3 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer3))
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
- val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
@@ -304,10 +303,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
var receivedMessages: List[String] = Nil
- for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStreams <- topicMessageStreams.values) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
- for (i <- 0 until nMessages * 2) {
+ for (_ <- 0 until nMessages * 2) {
assertTrue(iterator.hasNext())
val message = iterator.next().message
receivedMessages ::= message
@@ -390,7 +389,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// Register consumer rebalance listener
val rebalanceListener2 = new TestConsumerRebalanceListener()
zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
- val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// Consume messages from consumer 1 to make sure it has finished rebalance
getMessages(topicMessageStreams1, nMessages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 699715b..6430b33 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -100,9 +100,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
} catch {
- case e : Exception => {
- log.info("Thread interrupted")
- }
+ case _: Exception => log.info("Thread interrupted")
}
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0a1032f..a52fe48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -156,7 +156,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@@ -389,7 +389,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 8539340..bf695bf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -252,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
protocolType, List(("roundrobin", Array.empty[Byte])))
group.transitionTo(PreparingRebalance)
- member.awaitingJoinCallback = (result) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
assertEquals(0, group.generationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4515b94..2221d90 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -83,7 +83,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
TestUtils.getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
- for(i <- 0 until numMessages)
+ for(_ <- 0 until numMessages)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
// update offset in zookeeper for consumer to jump "forward" in time
@@ -103,12 +103,12 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
var received = 0
val iter = messageStream.iterator
try {
- for (i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
iter.next // will throw a timeout exception if the message isn't there
received += 1
}
} catch {
- case e: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
info("consumer timed out after receiving " + received + " messages.")
} finally {
producer.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 881837f..003c04c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -48,7 +48,7 @@ class FetcherTest extends KafkaServerTestHarness {
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
fetcher.stopConnections()
- val topicInfos = configs.map(c =>
+ val topicInfos = configs.map(_ =>
new PartitionTopicInfo(topic,
0,
queue,
@@ -81,13 +81,10 @@ class FetcherTest extends KafkaServerTestHarness {
def fetch(expected: Int) {
var count = 0
- while(true) {
+ while (count < expected) {
val chunk = queue.poll(2L, TimeUnit.SECONDS)
assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
- for(message <- chunk.messages)
- count += 1
- if(count == expected)
- return
+ count += chunk.messages.size
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 3998a21..201fa87 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -152,14 +152,14 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
}
{
// send some invalid partitions
val builder = new FetchRequestBuilder()
- for((topic, partition) <- topics)
+ for((topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
- case e: UnknownTopicOrPartitionException => // This is good.
+ case _: UnknownTopicOrPartitionException => // This is good.
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 83cce77..7d8e0c2 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -72,10 +72,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// call createMesssageStreams twice should throw MessageStreamsExistException
try {
- val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
fail("Should fail with MessageStreamsExistException")
} catch {
- case e: MessageStreamsExistException => // expected
+ case _: MessageStreamsExistException => // expected
}
zkConsumerConnector1.shutdown
info("all consumer connectors stopped")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 8702474..a7f0446 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
@Test
def testFileSize() {
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
- for(i <- 0 until 20) {
+ for (_ <- 0 until 20) {
messageSet.append(singleMessageSet("abcd".getBytes))
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
}
@@ -66,9 +66,8 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
def testPartialWrite(size: Int, messageSet: FileMessageSet) {
val buffer = ByteBuffer.allocate(size)
- val originalPosition = messageSet.channel.position
- for(i <- 0 until size)
- buffer.put(0.asInstanceOf[Byte])
+ for (_ <- 0 until size)
+ buffer.put(0: Byte)
buffer.rewind()
messageSet.channel.write(buffer)
// appending those bytes should not change the contents
@@ -195,7 +194,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
msgSet.truncateTo(43)
fail("Should throw KafkaException")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
EasyMock.verify(channelMock)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 14d24f7..36c61d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -120,9 +120,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
}
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
- for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+ for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
- val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+ log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
counter += 1
(key, count)
}
@@ -185,4 +185,4 @@ object LogCleanerLagIntegrationTest {
list.add(Array(codec.name))
list
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 4fa73dc..6e5806f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -173,15 +173,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
cleanerManager
}
- private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
- // append some messages to create some segments
- for (i <- 0 until 100)
- log.append(set)
-
- // expire all segments
- log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
- }
-
private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 24a6366..d80fba1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -254,7 +254,7 @@ class LogCleanerTest extends JUnitSuite {
// create 6 segments with only one message in each segment
val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
- for (i <- 0 until 6)
+ for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
@@ -272,7 +272,7 @@ class LogCleanerTest extends JUnitSuite {
// create 6 segments with only one message in each segment
val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
- for (i <- 0 until 6)
+ for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
@@ -281,7 +281,6 @@ class LogCleanerTest extends JUnitSuite {
val expectedCleanSize = segs.take(2).map(_.size).sum
val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
- val expectedUncleanableSize = segs.drop(4).map(_.size).sum
assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
logToClean.cleanBytes, expectedCleanSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 6d34c78..569264a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -60,7 +60,7 @@ class LogConfigTest {
case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
- case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+ case _ => assertPropertyInvalid(name, "not_a_number", "-1")
})
}
@@ -105,8 +105,4 @@ class LogConfigTest {
})
}
- private def randFrom[T](choices: T*): T = {
- import scala.util.Random
- choices(Random.nextInt(choices.size))
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 95a1cb5..5421da9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -88,8 +88,8 @@ class LogManagerTest {
def testCleanupExpiredSegments() {
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
var offset = 0L
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes())
+ for(_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -107,7 +107,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -134,7 +134,7 @@ class LogManagerTest {
// add a bunch of messages that should be larger than the retentionSize
val numMessages = 200
- for(i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
val set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.firstOffset
@@ -152,7 +152,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -168,8 +168,8 @@ class LogManagerTest {
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
var offset = 0L
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+ for (_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -197,8 +197,8 @@ class LogManagerTest {
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes())
+ for (_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
time.sleep(logManager.InitialTaskDelayMs)
@@ -215,7 +215,7 @@ class LogManagerTest {
TestUtils.tempDir(),
TestUtils.tempDir())
logManager.shutdown()
- logManager = createLogManager()
+ logManager = createLogManager(dirs)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
@@ -235,7 +235,7 @@ class LogManagerTest {
createLogManager()
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
- case e: KafkaException => // this is good
+ case _: KafkaException => // this is good
}
}
@@ -279,7 +279,7 @@ class LogManagerTest {
logManager: LogManager) {
val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
logs.foreach(log => {
- for(i <- 0 until 50)
+ for (_ <- 0 until 50)
log.append(TestUtils.singleMessageSet("test".getBytes()))
log.flush()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 04d46de..7f78148 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -138,7 +138,7 @@ class LogSegmentTest {
def testTruncate() {
val seg = createSegment(40)
var offset = 40
- for(i <- 0 until 30) {
+ for (_ <- 0 until 30) {
val ms1 = messages(offset, "hello")
seg.append(offset, Message.NoTimestamp, -1L, ms1)
val ms2 = messages(offset + 1, "hello")
@@ -160,7 +160,7 @@ class LogSegmentTest {
val numMessages = 30
val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
var offset = 40
- for (i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
seg.append(offset, offset, offset, messages(offset, "hello"))
offset += 1
}
@@ -279,7 +279,7 @@ class LogSegmentTest {
@Test
def testRecoveryWithCorruptMessage() {
val messagesAppended = 20
- for(iteration <- 0 until 10) {
+ for (_ <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9ecb651..d18719a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -163,7 +163,7 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
- for (i<- 1 to (msgPerSeg + 1)) {
+ for (_ <- 1 to (msgPerSeg + 1)) {
log.append(set)
}
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
@@ -326,14 +326,14 @@ class LogTest extends JUnitSuite {
log.read(0, 1000)
fail("Reading below the log start offset should throw OffsetOutOfRangeException")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
try {
log.read(1026, 1000)
fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
@@ -448,7 +448,7 @@ class LogTest extends JUnitSuite {
log.append(messageSet)
fail("message set should throw RecordBatchTooLargeException.")
} catch {
- case e: RecordBatchTooLargeException => // this is good
+ case _: RecordBatchTooLargeException => // this is good
}
}
@@ -475,19 +475,19 @@ class LogTest extends JUnitSuite {
log.append(messageSetWithUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithOneUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithCompressedUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
// the following should succeed without any InvalidMessageException
@@ -518,7 +518,7 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
- case e: RecordTooLargeException => // this is good
+ case _: RecordTooLargeException => // this is good
}
}
/**
@@ -725,7 +725,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
@@ -746,7 +746,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change offset", 0, log.logEndOffset)
assertEquals("Should change log size", 0, log.size)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
@@ -755,7 +755,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
assertEquals("Should change log size", log.size, 0)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
@@ -831,7 +831,7 @@ class LogTest extends JUnitSuite {
assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
// check that we can append to the log
- for(i <- 0 until 10)
+ for (_ <- 0 until 10)
log.append(set)
log.delete()
@@ -857,7 +857,7 @@ class LogTest extends JUnitSuite {
time)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.close()
log = new Log(logDir,
@@ -892,7 +892,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// files should be renamed
@@ -934,7 +934,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// expire all segments
@@ -986,7 +986,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes)
val recoveryPoint = 50L
- for(iteration <- 0 until 50) {
+ for (_ <- 0 until 50) {
// create a log and write some messages to it
logDir.mkdirs()
var log = new Log(logDir,
@@ -995,7 +995,7 @@ class LogTest extends JUnitSuite {
time.scheduler,
time)
val numMessages = 50 + TestUtils.random.nextInt(50)
- for(i <- 0 until numMessages)
+ for (_ <- 0 until numMessages)
log.append(set)
val messages = log.logSegments.flatMap(_.log.iterator.toList)
log.close()
@@ -1033,7 +1033,7 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.close()
@@ -1062,7 +1062,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1073,7 +1073,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1086,7 +1086,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1099,7 +1099,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1112,7 +1112,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1134,7 +1134,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for (i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// expire all segments
@@ -1143,7 +1143,7 @@ class LogTest extends JUnitSuite {
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
// append some messages to create some segments
- for (i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.delete()
@@ -1158,7 +1158,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments
@@ -1171,7 +1171,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments
@@ -1184,7 +1184,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionMs = 10000)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
@@ -1197,7 +1197,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionMs = 10000000)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
@@ -1212,7 +1212,7 @@ class LogTest extends JUnitSuite {
cleanupPolicy = "compact")
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
// mark oldest segment as older the retention.ms
@@ -1231,7 +1231,7 @@ class LogTest extends JUnitSuite {
cleanupPolicy = "compact,delete")
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 869e618..7618cf7 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -157,7 +157,7 @@ class OffsetIndexTest extends JUnitSuite {
val rand = new Random(1L)
val vals = new mutable.ArrayBuffer[Int](len)
var last = base
- for (i <- 0 until len) {
+ for (_ <- 0 until len) {
last += rand.nextInt(15) + 1
vals += last
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index a5bec17..b079f25 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -62,7 +62,6 @@ class OffsetMapTest extends JUnitSuite {
val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
for(i <- 0 until items)
map.put(key(i), i)
- var misses = 0
for(i <- 0 until items)
assertEquals(map.get(key(i)), i.toLong)
map
@@ -85,4 +84,4 @@ object OffsetMapTest {
println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
println("Collision rate: %.1f%%".format(100*map.collisionRate))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 18a023c..e2cfb87 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -293,7 +293,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
messageTimestampDiffMaxMs = 1000L)
fail("Should throw InvalidMessageException.")
} catch {
- case e: InvalidTimestampException =>
+ case _: InvalidTimestampException =>
}
try {
@@ -306,7 +306,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
messageTimestampDiffMaxMs = 1000L)
fail("Should throw InvalidMessageException.")
} catch {
- case e: InvalidTimestampException =>
+ case _: InvalidTimestampException =>
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 1438523..e8abfe1 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -94,8 +94,7 @@ class MessageCompressionTest extends JUnitSuite {
new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
true
} catch {
- case e: UnsatisfiedLinkError => false
- case e: org.xerial.snappy.SnappyError => false
+ case _: UnsatisfiedLinkError | _: org.xerial.snappy.SnappyError => false
}
}
@@ -104,7 +103,7 @@ class MessageCompressionTest extends JUnitSuite {
new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
true
} catch {
- case e: UnsatisfiedLinkError => false
+ case _: UnsatisfiedLinkError => false
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e60f350..da6f260 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -159,7 +159,7 @@ class SocketServerTest extends JUnitSuite {
outgoing.flush()
receiveResponse(socket)
} catch {
- case e: IOException => // thats fine
+ case _: IOException => // thats fine
}
}
@@ -186,14 +186,14 @@ class SocketServerTest extends JUnitSuite {
sendRequest(plainSocket, largeChunkOfBytes, Some(0))
fail("expected exception when writing to closed plain socket")
} catch {
- case e: IOException => // expected
+ case _: IOException => // expected
}
try {
sendRequest(traceSocket, largeChunkOfBytes, Some(0))
fail("expected exception when writing to closed trace socket")
} catch {
- case e: IOException => // expected
+ case _: IOException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 3088199..3093b93 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -76,7 +76,7 @@ class AsyncProducerTest {
fail("Queue should be full")
}
catch {
- case e: QueueFullException => //expected
+ case _: QueueFullException => //expected
}finally {
producer.close()
}
@@ -96,7 +96,7 @@ class AsyncProducerTest {
fail("should complain that producer is already closed")
}
catch {
- case e: ProducerClosedException => //expected
+ case _: ProducerClosedException => //expected
}
}
@@ -266,7 +266,7 @@ class AsyncProducerTest {
}
catch {
// should not throw any exception
- case e: Throwable => fail("Should not throw any exception")
+ case _: Throwable => fail("Should not throw any exception")
}
}
@@ -298,7 +298,7 @@ class AsyncProducerTest {
fail("Should fail with FailedToSendMessageException")
}
catch {
- case e: FailedToSendMessageException => // we retry on any exception now
+ case _: FailedToSendMessageException => // we retry on any exception now
}
}
@@ -317,8 +317,8 @@ class AsyncProducerTest {
producer.send(getProduceData(1): _*)
fail("Should fail with ClassCastException due to incompatible Encoder")
} catch {
- case e: ClassCastException =>
- }finally {
+ case _: ClassCastException =>
+ } finally {
producer.close()
}
}
@@ -352,9 +352,9 @@ class AsyncProducerTest {
val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
partitionedDataOpt match {
case Some(partitionedData) =>
- for ((brokerId, dataPerBroker) <- partitionedData) {
- for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
- assertTrue(partitionId == 0)
+ for (dataPerBroker <- partitionedData.values) {
+ for (tp <- dataPerBroker.keys)
+ assertTrue(tp.partition == 0)
}
case None =>
fail("Failed to collate requests by topic, partition")
@@ -461,7 +461,7 @@ class AsyncProducerTest {
fail("should complain about wrong config")
}
catch {
- case e: IllegalArgumentException => //expected
+ case _: IllegalArgumentException => //expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dc73db3..f4a339e 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -115,15 +115,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
keyEncoder = classOf[StringEncoder].getName,
producerProps = props)
- try{
+ try {
producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
fail("Test should fail because the broker list provided are not valid")
} catch {
- case e: FailedToSendMessageException => // this is expected
- case oe: Throwable => fail("fails with exception", oe)
- } finally {
- producer1.close()
- }
+ case _: FailedToSendMessageException => // this is expected
+ } finally producer1.close()
val producer2 = TestUtils.createProducer[String, String](
brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
@@ -216,9 +213,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
fail("we don't support request.required.acks greater than 1")
}
catch {
- case iae: IllegalArgumentException => // this is expected
- case e: Throwable => fail("Not expected", e)
-
+ case _: IllegalArgumentException => // this is expected
}
}
@@ -261,7 +256,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
fail("Should fail since no leader exists for the partition.")
} catch {
case e : TestFailedException => throw e // catch and re-throw the failure message
- case e2: Throwable => // otherwise success
+ case _: Throwable => // otherwise success
}
// restart server 1
@@ -290,6 +285,10 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
@Test
def testAsyncSendCanCorrectlyFailWithTimeout() {
+ val topic = "new-topic"
+ // create topics in ZK
+ TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+
val timeoutMs = 500
val props = new Properties()
props.put("request.timeout.ms", String.valueOf(timeoutMs))
@@ -303,10 +302,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
partitioner = classOf[StaticPartitioner].getName,
producerProps = props)
- val topic = "new-topic"
- // create topics in ZK
- TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
// do a simple test to make sure plumbing is okay
try {
// this message should be assigned to partition 0 whose leader is on broker 0
@@ -315,30 +310,25 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test".getBytes), messageSet1.next.message)
- } catch {
- case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
- }
-
- // stop IO threads and request handling, but leave networking operational
- // any requests should be accepted and queue up, but not handled
- server1.requestHandlerPool.shutdown()
-
- val t1 = SystemTime.milliseconds
- try {
- // this message should be assigned to partition 0 whose leader is on broker 0, but
- // broker 0 will not response within timeoutMs millis.
- producer.send(new KeyedMessage[String, String](topic, "test", "test"))
- } catch {
- case e: FailedToSendMessageException => /* success */
- case e: Exception => fail("Not expected", e)
- } finally {
- producer.close()
- }
- val t2 = SystemTime.milliseconds
-
- // make sure we don't wait fewer than timeoutMs
- assertTrue((t2-t1) >= timeoutMs)
+ assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server1.requestHandlerPool.shutdown()
+
+ val t1 = SystemTime.milliseconds
+ try {
+ // this message should be assigned to partition 0 whose leader is on broker 0, but
+ // broker 0 will not response within timeoutMs millis.
+ producer.send(new KeyedMessage[String, String](topic, "test", "test"))
+ } catch {
+ case _: FailedToSendMessageException => /* success */
+ }
+ val t2 = SystemTime.milliseconds
+ // make sure we don't wait fewer than timeoutMs
+ assertTrue((t2-t1) >= timeoutMs)
+
+ } finally producer.close()
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 968ea4b..7e72eec 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -151,8 +151,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
} catch {
- case e : java.io.IOException => // success
- case e2: Throwable => throw e2
+ case _ : java.io.IOException => // success
}
}
@@ -222,8 +221,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
producer.send(request)
fail("Should have received timeout exception since request handling is stopped.")
} catch {
- case e: SocketTimeoutException => /* success */
- case e: Throwable => fail("Unexpected exception when expecting timeout: " + e)
+ case _: SocketTimeoutException => /* success */
}
val t2 = SystemTime.milliseconds
// make sure we don't wait fewer than timeoutMs for a response
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 0a28e8f..1df34ea 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -31,7 +31,7 @@ class OperationTest extends JUnitSuite {
Operation.fromString("badName")
fail("Expected exception on invalid operation name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 6cd1ad7..1b1c864 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -31,7 +31,7 @@ class PermissionTypeTest extends JUnitSuite {
PermissionType.fromString("badName")
fail("Expected exception on invalid PermissionType name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index 3653a7d..546c92e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -31,7 +31,7 @@ class ResourceTypeTest extends JUnitSuite {
ResourceType.fromString("badName")
fail("Expected exception on invalid ResourceType name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1f52af4..0765992 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -347,7 +347,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
// Alternate authorizer to keep adding and removing zookeeper path
- val concurrentFuctions = (0 to 50).map { i =>
+ val concurrentFuctions = (0 to 50).map { _ =>
() => {
simpleAclAuthorizer.addAcls(Set(acl), resource)
simpleAclAuthorizer2.removeAcls(Set(acl), resource)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 9203130..8cec0c7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -65,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
JaasUtils.isZkSecurityEnabled()
fail("Should have thrown an exception")
} catch {
- case e: KafkaException => // Expected
+ case _: KafkaException => // Expected
}
}
@@ -286,7 +286,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
// Fail the test if able to delete
result match {
- case Success(v) => // All done
+ case Success(_) => // All done
case Failure(e) => fail(e.getMessage)
}
}
@@ -302,7 +302,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
case "/" => deleteRecursive(zkUtils, s"/$child")
case path => deleteRecursive(zkUtils, s"$path/$child")
}) match {
- case Success(v) => result
+ case Success(_) => result
case Failure(e) => Failure(e)
}
path match {
@@ -314,7 +314,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
zkUtils.deletePath(path)
Failure(new Exception(s"Have been able to delete $path"))
} catch {
- case e: Exception => result
+ case _: Exception => result
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 291e822..b581341 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -229,7 +229,7 @@ class ClientQuotaManagerTest {
/* We have 10 second windows. Make sure that there is no quota violation
* if we produce under the quota
*/
- for (i <- 0 until 10) {
+ for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
@@ -256,7 +256,7 @@ class ClientQuotaManagerTest {
assertEquals(11, numCallbacks)
// Could continue to see delays until the bursty sample disappears
- for (i <- 0 until 10) {
+ for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index faa23f0..2e50d30 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -168,7 +168,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
- case e: AdminOperationException => // expected
+ case _: AdminOperationException => // expected
}
}
@@ -198,7 +198,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// Version is provided. EntityType is incorrect
try {
@@ -207,7 +207,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// EntityName isn't provided
@@ -217,7 +217,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// Everything is provided
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 540a665..0051247 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.message.MessageSet
import org.apache.kafka.common.utils.{MockTime => JMockTime}
-
class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
@@ -52,7 +51,8 @@ class IsrExpirationTest {
@Before
def setUp() {
- replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+ replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false),
+ QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
}
@After
@@ -66,7 +66,7 @@ class IsrExpirationTest {
*/
@Test
def testIsrExpirationForStuckFollowers() {
- val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
+ val log = logMock
// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -75,8 +75,7 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset (15)
(partition0.assignedReplicas() - leaderReplica).foreach(
- r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
- MessageSet.Empty),
+ r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty),
-1L,
-1,
true)))
@@ -97,7 +96,7 @@ class IsrExpirationTest {
*/
@Test
def testIsrExpirationIfNoFetchRequestMade() {
- val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
+ val log = logMock
// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -119,7 +118,7 @@ class IsrExpirationTest {
@Test
def testIsrExpirationForSlowFollowers() {
// create leader replica
- val log = getLogWithLogEndOffset(15L, 4)
+ val log = logMock
// add one partition
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
@@ -158,7 +157,7 @@ class IsrExpirationTest {
private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
- val leaderId=config.brokerId
+ val leaderId = config.brokerId
val partition = replicaManager.getOrCreatePartition(topic, partitionId)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
@@ -171,12 +170,10 @@ class IsrExpirationTest {
partition
}
- private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
- val log1 = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
- EasyMock.replay(log1)
-
- log1
+ private def logMock: Log = {
+ val log = EasyMock.createMock(classOf[kafka.log.Log])
+ EasyMock.replay(log)
+ log
}
private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 042dabf..93d0413 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -112,7 +112,7 @@ class KafkaConfigTest {
props5.put("log.retention.ms", "0")
intercept[IllegalArgumentException] {
- val cfg5 = KafkaConfig.fromProps(props5)
+ KafkaConfig.fromProps(props5)
}
}
@@ -127,13 +127,13 @@ class KafkaConfigTest {
props3.put("log.retention.hours", "0")
intercept[IllegalArgumentException] {
- val cfg1 = KafkaConfig.fromProps(props1)
+ KafkaConfig.fromProps(props1)
}
intercept[IllegalArgumentException] {
- val cfg2 = KafkaConfig.fromProps(props2)
+ KafkaConfig.fromProps(props2)
}
intercept[IllegalArgumentException] {
- val cfg3 = KafkaConfig.fromProps(props3)
+ KafkaConfig.fromProps(props3)
}
}
@@ -303,7 +303,7 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
true
} catch {
- case e: IllegalArgumentException => false
+ case _: IllegalArgumentException => false
}
}
@@ -561,8 +561,7 @@ class KafkaConfigTest {
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
-
- case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+ case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0885709..0f3ee63 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -90,7 +90,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val log = logManager.getLog(TopicAndPartition(topic, part)).get
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
@@ -125,7 +125,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
var offsetChanged = false
- for(i <- 1 to 14) {
+ for (_ <- 1 to 14) {
val topicAndPartition = TopicAndPartition(topic, 0)
val offsetRequest =
OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
@@ -151,7 +151,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
@@ -180,7 +180,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index b34c93d..138c36d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -267,7 +267,7 @@ class MetadataCacheTest {
fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead")
}
catch {
- case e: BrokerEndPointNotAvailableException => //expected
+ case _: BrokerEndPointNotAvailableException => //expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 29eaf2d..64c67d6 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -296,7 +296,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 1)
// Commit an offset
- val expectedReplicaAssignment = Map(0 -> List(1))
val commitRequest = OffsetCommitRequest(group, immutable.Map(
TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 825b2b4..3a09737 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,7 +120,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Add data equally to each partition
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
- (0 until msgCount).foreach { x =>
+ (0 until msgCount).foreach { _ =>
(0 to 7).foreach { partition =>
producer.send(new ProducerRecord(topic, partition, null, msg))
}
@@ -215,7 +215,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
- (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get }
+ (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
waitForOffsetsToMatch(msgCount, 0, 100)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4e4fb95..9e19e39 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
fail("Versions Request during Sasl handshake did not fail")
} catch {
- case ioe: IOException => // expected exception
+ case _: IOException => // expected exception
}
} finally {
plaintextSocket.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index e0b6db4..f21f2de 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -130,7 +130,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
try {
server1.startup()
} catch {
- case e: kafka.common.InconsistentBrokerIdException => //success
+ case _: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
CoreUtils.delete(server1.config.logDirs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bc71edd..fd0a460 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -129,16 +129,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
try {
server.startup()
- fail("Expected KafkaServer setup to fail, throw exception")
+ fail("Expected KafkaServer setup to fail and throw exception")
}
catch {
// Try to clean up carefully without hanging even if the test fails. This means trying to accurately
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
- case e: org.I0Itec.zkclient.exception.ZkException =>
+ case _: org.I0Itec.zkclient.exception.ZkException =>
assertEquals(NotRunning.state, server.brokerState.currentState)
- case e: Throwable =>
- fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
}
finally {
if (server.brokerState.currentState != NotRunning.state)
@@ -170,7 +168,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
assertTrue(true)
}
catch{
- case ex: Throwable => fail()
+ case _: Throwable => fail()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b5560c3..86b167e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -58,7 +58,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
TestUtils.createServer(KafkaConfig.fromProps(props2))
fail("Registering a broker with a conflicting id should fail")
} catch {
- case e : RuntimeException =>
+ case _: RuntimeException =>
// this is expected
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 741eec9..da80c0d 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -62,7 +62,7 @@ class ConsoleProducerTest {
new ConsoleProducer.ProducerConfig(invalidArgs)
Assert.fail("Should have thrown an UnrecognizedOptionException")
} catch {
- case e: joptsimple.OptionException => // expected exception
+ case _: joptsimple.OptionException => // expected exception
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3796e48..83fd6b8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
val file = new RandomAccessFile(fileName, "rw")
file.seek(position)
- for(i <- 0 until size)
+ for (_ <- 0 until size)
file.writeByte(random.nextInt(255))
file.close()
}
def appendNonsenseToFile(fileName: File, size: Int) {
val file = new FileOutputStream(fileName, true)
- for(i <- 0 until size)
+ for (_ <- 0 until size)
file.write(random.nextInt(255))
file.close()
}
@@ -984,7 +984,7 @@ object TestUtils extends Logging {
var messages: List[String] = Nil
val shouldGetAllMessages = nMessagesPerThread < 0
- for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStreams <- topicMessageStreams.values) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator()
try {
@@ -1117,7 +1117,7 @@ object TestUtils extends Logging {
}
}
} catch {
- case ie: InterruptedException => failWithTimeout()
+ case _: InterruptedException => failWithTimeout()
case e: Throwable => exceptions += e
} finally {
threadPool.shutdownNow()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index 29c9067..64129e9 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -35,8 +35,6 @@ class TimerTaskListTest {
@Test
def testAll() {
val sharedCounter = new AtomicInteger(0)
- val runCounter = new AtomicInteger(0)
- val execCounter = new AtomicInteger(0)
val list1 = new TimerTaskList(sharedCounter)
val list2 = new TimerTaskList(sharedCounter)
val list3 = new TimerTaskList(sharedCounter)
@@ -46,7 +44,7 @@ class TimerTaskListTest {
list1.add(new TimerTaskEntry(task, 10L))
assertEquals(i, sharedCounter.get)
task
- }.toSeq
+ }
assertEquals(tasks.size, sharedCounter.get)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index c2c25ed..4d57ed9 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -80,7 +80,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
try {
zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
} catch {
- case e: Exception =>
+ case _: Exception =>
}
var testData: String = null
@@ -147,7 +147,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
zwe.create()
false
} catch {
- case e: ZkNodeExistsException => true
+ case _: ZkNodeExistsException => true
}
Assert.assertTrue(gotException)
zkClient2.close()
@@ -155,7 +155,6 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
/**
* Tests if succeeds with znode from the same session
- *
*/
@Test
def testSameSession = {
@@ -171,7 +170,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
zwe.create()
false
} catch {
- case e: ZkNodeExistsException => true
+ case _: ZkNodeExistsException => true
}
Assert.assertFalse(gotException)
}
[4/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
Posted by ij...@apache.org.
MINOR: A bunch of clean-ups related to usage of unused variables
There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.
The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #1985 from ijuma/remove-unused
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0926738
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0926738
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0926738
Branch: refs/heads/trunk
Commit: d092673838173d9dedbf5acf3f4e2cd8c736294f
Parents: 1fc450f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Oct 25 02:55:55 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Oct 25 02:55:55 2016 +0100
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 2 -
.../consumer/internals/ConsumerCoordinator.java | 2 -
.../consumer/internals/SubscriptionState.java | 7 --
.../kafka/clients/producer/KafkaProducer.java | 1 -
.../clients/producer/internals/Sender.java | 10 +--
.../apache/kafka/common/network/MultiSend.java | 6 +-
.../common/network/PlaintextChannelBuilder.java | 4 +-
.../kafka/common/network/SslChannelBuilder.java | 4 +-
.../kafka/common/network/SslTransportLayer.java | 5 +-
.../authenticator/SaslServerAuthenticator.java | 5 +-
.../apache/kafka/clients/NetworkClientTest.java | 3 +-
.../producer/internals/BufferPoolTest.java | 2 +-
.../clients/producer/internals/SenderTest.java | 3 -
.../apache/kafka/connect/data/Timestamp.java | 4 --
.../runtime/distributed/WorkerCoordinator.java | 5 +-
.../runtime/rest/entities/ConnectorInfo.java | 9 ---
.../src/main/scala/kafka/admin/AclCommand.scala | 1 -
.../src/main/scala/kafka/admin/AdminUtils.scala | 6 +-
.../main/scala/kafka/admin/ConfigCommand.scala | 10 +--
.../kafka/admin/ConsumerGroupCommand.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 2 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
.../main/scala/kafka/admin/TopicCommand.scala | 4 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 1 -
.../kafka/api/ControlledShutdownResponse.scala | 2 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 7 +-
.../src/main/scala/kafka/api/LeaderAndIsr.scala | 4 +-
.../scala/kafka/api/OffsetCommitResponse.scala | 2 +-
.../main/scala/kafka/api/OffsetRequest.scala | 5 +-
.../main/scala/kafka/api/ProducerRequest.scala | 5 +-
.../main/scala/kafka/client/ClientUtils.scala | 6 +-
.../scala/kafka/cluster/BrokerEndPoint.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 3 +-
core/src/main/scala/kafka/cluster/Replica.scala | 7 +-
.../ZkNodeChangeNotificationListener.scala | 2 +-
.../kafka/consumer/PartitionAssignor.scala | 4 +-
.../main/scala/kafka/consumer/TopicCount.scala | 4 +-
.../main/scala/kafka/consumer/TopicFilter.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 24 +++----
.../kafka/controller/KafkaController.scala | 74 +++++++++-----------
.../controller/PartitionStateMachine.scala | 12 ++--
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../kafka/controller/TopicDeletionManager.scala | 4 +-
.../coordinator/GroupMetadataManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 4 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
.../scala/kafka/log/LogCleanerManager.scala | 10 ++-
.../kafka/message/ByteBufferMessageSet.scala | 2 +-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 19 ++---
.../scala/kafka/network/BlockingChannel.scala | 2 +-
.../kafka/producer/DefaultPartitioner.scala | 2 -
.../main/scala/kafka/producer/Producer.scala | 2 +-
.../scala/kafka/security/auth/Resource.scala | 2 +-
.../security/auth/SimpleAclAuthorizer.scala | 4 +-
.../kafka/server/AbstractFetcherManager.scala | 2 +-
.../main/scala/kafka/server/AdminManager.scala | 2 +-
.../kafka/server/BrokerMetadataCheckpoint.scala | 2 +-
.../scala/kafka/server/ClientQuotaManager.scala | 8 +--
.../main/scala/kafka/server/DelayedFetch.scala | 4 +-
.../kafka/server/DynamicConfigManager.scala | 3 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 1 -
.../scala/kafka/server/OffsetCheckpoint.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 21 +++---
.../kafka/server/ZookeeperLeaderElector.scala | 2 +-
.../scala/kafka/tools/ConsoleConsumer.scala | 6 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 2 +-
.../scala/kafka/tools/DumpLogSegments.scala | 10 ++-
.../scala/kafka/tools/EndToEndLatency.scala | 3 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 10 +--
.../scala/kafka/tools/ReplayLogProducer.scala | 5 +-
.../kafka/tools/ReplicaVerificationTool.scala | 21 +++---
core/src/main/scala/kafka/utils/CoreUtils.scala | 4 +-
core/src/main/scala/kafka/utils/FileLock.scala | 4 +-
.../src/main/scala/kafka/utils/Mx4jLoader.scala | 6 +-
.../scala/kafka/utils/ReplicationUtils.scala | 4 +-
.../kafka/utils/VerifiableProperties.scala | 4 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 30 ++++----
.../integration/kafka/api/AdminClientTest.scala | 1 -
.../kafka/api/AuthorizerIntegrationTest.scala | 15 ++--
.../kafka/api/BaseConsumerTest.scala | 2 +-
.../kafka/api/BaseProducerSendTest.scala | 21 +++---
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/FixedPortTestUtils.scala | 2 +-
.../kafka/api/IntegrationTestHarness.scala | 4 +-
.../kafka/api/PlaintextConsumerTest.scala | 13 ++--
.../kafka/api/PlaintextProducerSendTest.scala | 4 +-
.../kafka/api/ProducerBounceTest.scala | 4 +-
.../scala/kafka/security/minikdc/MiniKdc.scala | 5 +-
.../scala/kafka/tools/TestLogCleaning.scala | 2 +-
.../test/scala/other/kafka/StressTestLog.scala | 3 +-
.../scala/other/kafka/TestCrcPerformance.scala | 14 ++--
.../scala/other/kafka/TestKafkaAppender.scala | 9 +--
.../other/kafka/TestLinearWriteSpeed.scala | 3 +-
.../scala/other/kafka/TestOffsetManager.scala | 6 +-
.../other/kafka/TestPurgatoryPerformance.scala | 2 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 6 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 4 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 7 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 4 +-
.../admin/ReassignPartitionsCommandTest.scala | 2 +-
.../scala/unit/kafka/api/ApiUtilsTest.scala | 12 ++--
.../api/RequestResponseSerializationTest.scala | 36 ----------
.../scala/unit/kafka/common/ConfigTest.scala | 8 +--
.../scala/unit/kafka/common/TopicTest.scala | 6 +-
.../kafka/consumer/ConsumerIteratorTest.scala | 13 ++--
.../kafka/consumer/PartitionAssignorTest.scala | 26 +++----
.../ZookeeperConsumerConnectorTest.scala | 19 +++--
.../controller/ControllerFailoverTest.scala | 4 +-
.../coordinator/GroupMetadataManagerTest.scala | 4 +-
.../kafka/coordinator/GroupMetadataTest.scala | 2 +-
.../kafka/integration/AutoOffsetResetTest.scala | 6 +-
.../unit/kafka/integration/FetcherTest.scala | 9 +--
.../kafka/integration/PrimitiveApiTest.scala | 6 +-
.../ZookeeperConsumerConnectorTest.scala | 4 +-
.../unit/kafka/log/FileMessageSetTest.scala | 9 ++-
.../log/LogCleanerLagIntegrationTest.scala | 6 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 9 ---
.../scala/unit/kafka/log/LogCleanerTest.scala | 5 +-
.../scala/unit/kafka/log/LogConfigTest.scala | 6 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 24 +++----
.../scala/unit/kafka/log/LogSegmentTest.scala | 6 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 62 ++++++++--------
.../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +-
.../scala/unit/kafka/log/OffsetMapTest.scala | 3 +-
.../message/ByteBufferMessageSetTest.scala | 4 +-
.../kafka/message/MessageCompressionTest.scala | 5 +-
.../unit/kafka/network/SocketServerTest.scala | 6 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 20 +++---
.../unit/kafka/producer/ProducerTest.scala | 66 ++++++++---------
.../unit/kafka/producer/SyncProducerTest.scala | 6 +-
.../kafka/security/auth/OperationTest.scala | 2 +-
.../security/auth/PermissionTypeTest.scala | 2 +-
.../kafka/security/auth/ResourceTypeTest.scala | 2 +-
.../security/auth/SimpleAclAuthorizerTest.scala | 2 +-
.../security/auth/ZkAuthorizationTest.scala | 8 +--
.../kafka/server/ClientQuotaManagerTest.scala | 4 +-
.../kafka/server/DynamicConfigChangeTest.scala | 8 +--
.../unit/kafka/server/ISRExpirationTest.scala | 25 +++----
.../unit/kafka/server/KafkaConfigTest.scala | 13 ++--
.../scala/unit/kafka/server/LogOffsetTest.scala | 8 +--
.../unit/kafka/server/MetadataCacheTest.scala | 2 +-
.../unit/kafka/server/OffsetCommitTest.scala | 1 -
.../kafka/server/ReplicationQuotasTest.scala | 4 +-
.../server/SaslApiVersionsRequestTest.scala | 2 +-
.../server/ServerGenerateBrokerIdTest.scala | 2 +-
.../unit/kafka/server/ServerShutdownTest.scala | 8 +--
.../unit/kafka/server/ServerStartupTest.scala | 2 +-
.../unit/kafka/tools/ConsoleProducerTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 +--
.../kafka/utils/timer/TimerTaskListTest.scala | 4 +-
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 7 +-
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 50 ++++---------
.../scala/unit/kafka/zk/ZkFourLetterWords.scala | 2 +-
.../internals/RocksDBKeyValueStoreTest.java | 2 +-
155 files changed, 477 insertions(+), 704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73543ad..59319ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -749,7 +749,6 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class GroupCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public final Sensor heartbeatLatency;
@@ -757,7 +756,6 @@ public abstract class AbstractCoordinator implements Closeable {
public final Sensor syncLatency;
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatLatency = metrics.sensor("heartbeat-latency");
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bd95409..a8d94fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,13 +757,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
private class ConsumerCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public final Sensor commitLatency;
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6dc2060..003d1a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -65,9 +65,6 @@ public class SubscriptionState {
/* the list of topics the user has requested */
private Set<String> subscription;
- /* the list of partitions the user has requested */
- private Set<TopicPartition> userAssignment;
-
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
@@ -86,7 +83,6 @@ public class SubscriptionState {
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
this.defaultResetStrategy = defaultResetStrategy;
this.subscription = Collections.emptySet();
- this.userAssignment = Collections.emptySet();
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
@@ -160,8 +156,6 @@ public class SubscriptionState {
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
if (!this.assignment.partitionSet().equals(partitions)) {
- this.userAssignment = partitions;
-
Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
for (TopicPartition partition : partitions) {
TopicPartitionState state = assignment.stateValue(partition);
@@ -218,7 +212,6 @@ public class SubscriptionState {
public void unsubscribe() {
this.subscription = Collections.emptySet();
- this.userAssignment = Collections.emptySet();
this.assignment.clear();
this.subscribedPattern = null;
this.subscriptionType = SubscriptionType.NONE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3632384..489c762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -318,7 +318,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
- clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8fc7f2c..c71bb67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -92,9 +92,6 @@ public class Sender implements Runnable {
/* metrics */
private final SenderMetrics sensors;
- /* param clientId of the client */
- private String clientId;
-
/* the max time to wait for the server to respond to the request*/
private final int requestTimeout;
@@ -107,7 +104,6 @@ public class Sender implements Runnable {
int retries,
Metrics metrics,
Time time,
- String clientId,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
@@ -118,7 +114,6 @@ public class Sender implements Runnable {
this.acks = acks;
this.retries = retries;
this.time = time;
- this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
@@ -281,8 +276,7 @@ public class Sender implements Runnable {
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
}
this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
- this.sensors.recordThrottleTime(response.request().request().destination(),
- produceResponse.getThrottleTime());
+ this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values())
@@ -564,7 +558,7 @@ public class Sender implements Runnable {
}
}
- public void recordThrottleTime(String node, long throttleTimeMs) {
+ public void recordThrottleTime(long throttleTimeMs) {
this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
index 0e14a39..11f5e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -34,7 +34,6 @@ public class MultiSend implements Send {
private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
private String dest;
private long totalWritten = 0;
- private List<Send> sends;
private Iterator<Send> sendsIterator;
private Send current;
private boolean doneSends = false;
@@ -42,7 +41,6 @@ public class MultiSend implements Send {
public MultiSend(String dest, List<Send> sends) {
this.dest = dest;
- this.sends = sends;
this.sendsIterator = sends.iterator();
nextSendOrDone();
for (Send send: sends)
@@ -76,7 +74,7 @@ public class MultiSend implements Send {
throw new KafkaException("This operation cannot be completed on a complete request.");
int totalWrittenPerCall = 0;
- boolean sendComplete = false;
+ boolean sendComplete;
do {
long written = current.writeTo(channel);
totalWritten += written;
@@ -97,4 +95,4 @@ public class MultiSend implements Send {
else
doneSends = true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index f0af935..c573672 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -36,17 +36,15 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
- KafkaChannel channel = null;
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);
}
- return channel;
}
public void close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b546174..1d612bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -46,17 +46,15 @@ public class SslChannelBuilder implements ChannelBuilder {
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
- KafkaChannel channel = null;
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);
}
- return channel;
}
public void close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index f8926f3..7ce59f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -397,12 +397,11 @@ public class SslTransportLayer implements TransportLayer {
private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
log.trace("SSLHandshake handshakeUnwrap {}", channelId);
SSLEngineResult result;
- boolean cont = false;
- int read = 0;
if (doRead) {
- read = socketChannel.read(netReadBuffer);
+ int read = socketChannel.read(netReadBuffer);
if (read == -1) throw new EOFException("EOF during handshake.");
}
+ boolean cont;
do {
//prepare the buffer with the incoming data
netReadBuffer.flip();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e1074a1..206fe39 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -63,7 +63,6 @@ import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
-import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
@@ -310,7 +309,7 @@ public class SaslServerAuthenticator implements Authenticator {
LOG.debug("Handle Kafka request {}", apiKey);
switch (apiKey) {
case API_VERSIONS:
- handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+ handleApiVersionsRequest(requestHeader);
break;
case SASL_HANDSHAKE:
clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
@@ -361,7 +360,7 @@ public class SaslServerAuthenticator implements Authenticator {
}
}
- private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
+ private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b556240..d305e8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -153,14 +153,13 @@ public class NetworkClientTest {
@Test
public void testLeastLoadedNode() {
- Node leastNode = null;
client.ready(node, time.milliseconds());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
// leastloadednode should be our single node
- leastNode = client.leastLoadedNode(time.milliseconds());
+ Node leastNode = client.leastLoadedNode(time.milliseconds());
assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
// sleep for longer than reconnect backoff
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 48682b1..3756d8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -88,7 +88,7 @@ public class BufferPoolTest {
ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
assertEquals(1024, buffer.limit());
pool.deallocate(buffer);
- buffer = pool.allocate(1025, maxBlockTimeMs);
+ pool.allocate(1025, maxBlockTimeMs);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7f9e74..b7645dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -83,7 +83,6 @@ public class SenderTest {
MAX_RETRIES,
metrics,
time,
- CLIENT_ID,
REQUEST_TIMEOUT);
metadata.update(cluster, time.milliseconds());
@@ -143,7 +142,6 @@ public class SenderTest {
maxRetries,
m,
time,
- "clientId",
REQUEST_TIMEOUT);
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
@@ -196,7 +194,6 @@ public class SenderTest {
maxRetries,
m,
time,
- "clientId",
REQUEST_TIMEOUT);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
index c447f6d..cd7ed4a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
-import java.util.TimeZone;
-
/**
* <p>
* A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
@@ -30,8 +28,6 @@ import java.util.TimeZone;
public class Timestamp {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
/**
* Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
* as required/optional, default value, and documentation.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 8a065f1..88a0a8d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -52,7 +52,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
private final String restUrl;
private final ConfigBackingStore configStorage;
private ConnectProtocol.Assignment assignmentSnapshot;
- private final WorkerCoordinatorMetrics sensors;
private ClusterConfigState configSnapshot;
private final WorkerRebalanceListener listener;
private LeaderState leaderState;
@@ -86,7 +85,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
this.restUrl = restUrl;
this.configStorage = configStorage;
this.assignmentSnapshot = null;
- this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
+ new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
this.listener = listener;
this.rejoinRequested = false;
}
@@ -306,11 +305,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
private class WorkerCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
Measurable numConnectors = new Measurable() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 9567ef9..3faff65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -74,11 +72,4 @@ public class ConnectorInfo {
return Objects.hash(name, config, tasks);
}
-
- private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) {
- List<ConnectorTaskId> jsonTasks = new ArrayList<>();
- for (ConnectorTaskId task : tasks)
- jsonTasks.add(task);
- return jsonTasks;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index a098535..58c966d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -84,7 +84,6 @@ object AclCommand {
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
for ((resource, acls) <- resourceToAcl) {
- val acls = resourceToAcl(resource)
println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
authorizer.addAcls(acls, resource)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index aa38f69..d3ce217 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -320,7 +320,7 @@ object AdminUtils extends Logging with AdminUtilities {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
- case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+ case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2)
}
@@ -471,7 +471,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
- case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
+ case _: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
@@ -593,7 +593,7 @@ object AdminUtils extends Logging with AdminUtilities {
case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
}
- case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
+ case _ => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
}
}
props
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 20048ec..34df6b0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -99,7 +99,7 @@ object ConfigCommand extends Config {
private def parseBroker(broker: String): Int = {
try broker.toInt
catch {
- case e: NumberFormatException =>
+ case _: NumberFormatException =>
throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
}
}
@@ -190,20 +190,20 @@ object ConfigCommand extends Config {
val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
.map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
child match {
- case Some (s) =>
+ case Some(s) =>
rootEntities.flatMap(rootEntity =>
ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
case None => rootEntities
}
- case (rootName, Some(childEntity)) =>
+ case (_, Some(childEntity)) =>
childEntity.sanitizedName match {
- case Some(subName) => Seq(this)
+ case Some(_) => Seq(this)
case None =>
zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
.map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
}
- case (rootName, None) =>
+ case (_, None) =>
Seq(this)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 354e6a2..6300d76 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -253,7 +253,7 @@ object ConsumerGroupCommand extends Logging {
}
assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap { consumerId =>
- topicsByConsumerId(consumerId).flatMap { topic =>
+ topicsByConsumerId(consumerId).flatMap { _ =>
// since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets
// since consumer id is repeated in client id, leave host and client id empty
collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d194eca..81014b1 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -109,7 +109,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
zkUtils.createPersistentPath(zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
} catch {
- case nee: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index a037fd4..709b365 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -218,7 +218,7 @@ object ReassignPartitionsCommand extends Logging {
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
val newReplicas = partitionsToBeReassigned(topicAndPartition)
partitionsBeingReassigned.get(topicAndPartition) match {
- case Some(partition) => ReassignmentInProgress
+ case Some(_) => ReassignmentInProgress
case None =>
// check if the current replica assignment matches the expected one after reassignment
val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
@@ -394,7 +394,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
true
}
} catch {
- case ze: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a35a989..2fcc2ce 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -176,11 +176,11 @@ object TopicCommand extends Logging {
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
- case e: Throwable =>
+ case _: Throwable =>
throw new AdminOperationException("Error while deleting topic %s".format(topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a87e5b7..9ffee86 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -123,7 +123,6 @@ object ZkSecurityMigrator extends Logging {
}
class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
- private val workQueue = new LinkedBlockingQueue[Runnable]
private val futures = new Queue[Future[String]]
private def setAcl(path: String, setPromise: Promise[String]) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 02eeae1..1ba5cfa 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -30,7 +30,7 @@ object ControlledShutdownResponse {
val numEntries = buffer.getInt
var partitionsRemaining = Set[TopicAndPartition]()
- for (i<- 0 until numEntries){
+ for (_ <- 0 until numEntries){
val topic = readShortString(buffer)
val partition = buffer.getInt
partitionsRemaining += new TopicAndPartition(topic, partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 3c380c9..57e99c1 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -69,7 +69,7 @@ object FetchRequest {
val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic, values) =>
topic -> random.shuffle(values)
}
- random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
+ random.shuffle(groupedByTopic.toSeq).flatMap { case (_, partitions) =>
partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
}
}
@@ -196,9 +196,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val fetchResponsePartitionData = requestInfo.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
+ val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
+ (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
}
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e5813a5..9123788 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -46,10 +46,10 @@ object PartitionStateInfo {
val leader = buffer.getInt
val leaderEpoch = buffer.getInt
val isrSize = buffer.getInt
- val isr = for(i <- 0 until isrSize) yield buffer.getInt
+ val isr = for (_ <- 0 until isrSize) yield buffer.getInt
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
- val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+ val replicas = for (_ <- 0 until replicationFactor) yield buffer.getInt
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
replicas.toSet)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index d4f6158..94223c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -48,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
- def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
+ def hasError = commitStatus.values.exists(_ != Errors.NONE.code)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index b15cf5a..416dd73 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -114,8 +114,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val partitionOffsetResponseMap = requestInfo.map {
- case (topicAndPartition, partitionOffsetRequest) =>
+ val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
(topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
@@ -133,4 +132,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
offsetRequest.toString()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index aad2fa5..3ca7bd7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -133,9 +133,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
requestChannel.closeConnection(request.processor, request)
}
else {
- val producerResponseStatus = data.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
+ val producerResponseStatus = data.map { case (topicAndPartition, _) =>
+ (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fda8b0b..8893697 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -121,7 +121,7 @@ object ClientUtils extends Logging{
debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
true
} catch {
- case e: Exception =>
+ case _: Exception =>
if (channel != null) channel.disconnect()
channel = null
info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
@@ -164,7 +164,7 @@ object ClientUtils extends Logging{
}
}
catch {
- case ioe: IOException =>
+ case _: IOException =>
info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
queryChannel.disconnect()
}
@@ -187,7 +187,7 @@ object ClientUtils extends Logging{
queryChannel.disconnect()
}
catch {
- case ioe: IOException => // offsets manager may have moved
+ case _: IOException => // offsets manager may have moved
info("Error while connecting to %s.".format(connectString))
if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
Thread.sleep(retryBackOffMs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 91823f0..847e959 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -32,7 +32,7 @@ object BrokerEndPoint {
*/
def parseHostPort(connectionString: String): Option[(String, Int)] = {
connectionString match {
- case uriParseExp(host, port) => try Some(host, port.toInt) catch { case e: NumberFormatException => None }
+ case uriParseExp(host, port) => try Some(host, port.toInt) catch { case _: NumberFormatException => None }
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 28d3c8d..4d3fb56 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -415,11 +415,10 @@ class Partition(val topic: String,
* is violated, that replica is considered to be out of sync
*
**/
- val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
- if(laggingReplicas.nonEmpty)
+ if (laggingReplicas.nonEmpty)
debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index dfb203a..13c1921 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -38,12 +38,7 @@ class Replica(val brokerId: Int,
val topic = partition.topic
val partitionId = partition.partitionId
- def isLocal: Boolean = {
- log match {
- case Some(l) => true
- case None => false
- }
- }
+ def isLocal: Boolean = log.isDefined
private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 580ae33..ef8190c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -91,7 +91,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
- val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+ val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
data.map(notificationHandler.processNotification(_)).getOrElse {
logger.warn(s"read null data from $changeZnode when processing notification $notification")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 96fe690..900a4b6 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -72,7 +72,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
- val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+ val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
@@ -131,7 +131,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
class RangeAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
- val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+ val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
for (topic <- ctx.myTopicThreadIds.keySet) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 5706d3c..eb035f2 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -140,8 +140,8 @@ private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
def pattern: String = {
topicFilter match {
- case wl: Whitelist => TopicCount.whiteListPattern
- case bl: Blacklist => TopicCount.blackListPattern
+ case _: Whitelist => TopicCount.whiteListPattern
+ case _: Blacklist => TopicCount.blackListPattern
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 1ab4e5c..914e9b9 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -34,7 +34,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
Pattern.compile(regex)
}
catch {
- case e: PatternSyntaxException =>
+ case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578..81b6264 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -322,8 +322,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def commitOffsets(isAutoCommit: Boolean) {
val offsetsToCommit =
- immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
- partitionTopicInfos.map { case (partition, info) =>
+ immutable.Map(topicRegistry.values.flatMap { partitionTopicInfos =>
+ partitionTopicInfos.values.map { info =>
TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq: _*)
@@ -442,7 +442,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
trace("Offset fetch response: %s.".format(offsetFetchResponse))
val (leaderChanged, loadInProgress) =
- offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+ offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
(folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
}
@@ -706,7 +706,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
- valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
+ valueFactory = Some((_: String) => new Pool[Int, PartitionTopicInfo]))
// fetch current offsets for all topic-partitions
val topicPartitions = partitionAssignment.keySet.toSeq
@@ -731,7 +731,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionAssignment.size
- partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+ partitionAssignment.view.groupBy { case (topicPartition, _) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) =>
newGauge("OwnedPartitionsCount",
new Gauge[Int] {
@@ -851,11 +851,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
successfullyOwnedPartitions ::= (topic, partition)
true
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)
false
- case e2: Throwable => throw e2
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
@@ -918,19 +917,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicCount.getConsumerThreadIdsPerTopic
val allQueuesAndStreams = topicCount match {
- case wildTopicCount: WildcardTopicCount =>
+ case _: WildcardTopicCount =>
/*
* Wild-card consumption streams share the same queues, so we need to
* duplicate the list for the subsequent zip operation.
*/
(1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
- case statTopicCount: StaticTopicCount =>
+ case _: StaticTopicCount =>
queuesAndStreams
}
- val topicThreadIds = consumerThreadIdsPerTopic.map {
- case(topic, threadIds) =>
- threadIds.map((topic, _))
+ val topicThreadIds = consumerThreadIdsPerTopic.map { case (topic, threadIds) =>
+ threadIds.map((topic, _))
}.flatten
require(topicThreadIds.size == allQueuesAndStreams.size,
@@ -988,7 +986,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
"message streams by filter at most once.")
private val wildcardQueuesAndStreams = (1 to numStreams)
- .map(e => {
+ .map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 063ea6f..b7b4d71 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -77,25 +77,23 @@ class ControllerContext(val zkUtils: ZkUtils,
def liveOrShuttingDownBrokers = liveBrokersUnderlying
def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case(topicAndPartition, replicas) => topicAndPartition }
- .toSet
+ partitionReplicaAssignment.collect {
+ case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+ }.toSet
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.flatMap { brokerId =>
- partitionReplicaAssignment
- .filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case (topicAndPartition, replicas) =>
+ partitionReplicaAssignment.collect {
+ case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
- }
+ }
}.toSet
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionReplicaAssignment
- .filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+ .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
.flatMap { case (topicAndPartition, replicas) =>
replicas.map { r =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
@@ -103,10 +101,8 @@ class ControllerContext(val zkUtils: ZkUtils,
}.toSet
}
- def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
- }
+ def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
+ partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds)
@@ -144,7 +140,7 @@ object KafkaController extends Logging {
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
- case t: Throwable =>
+ case _: Throwable =>
// It may be due to an incompatible controller register version
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
@@ -433,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
- case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+ case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
// check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
@@ -622,12 +618,11 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
- val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
- if(assignedReplicas == newReplicas) {
+ if (assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
@@ -706,7 +701,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.epoch = newControllerEpoch
}
} catch {
- case nne: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// if path doesn't exist, this is the first controller whose epoch should be 1
// the following call can still fail if another controller gets elected between checking if the path exists and
// trying to create the controller epoch path
@@ -715,7 +710,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.epoch = KafkaController.InitialControllerEpoch
controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
} catch {
- case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+ case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
@@ -788,7 +783,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
private def initializeTopicDeletion() {
val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
- val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
+ val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
@@ -989,7 +984,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
- case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
+ case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
case e2: Throwable => throw new KafkaException(e2.toString)
}
}
@@ -1182,7 +1177,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
- case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+ case (_, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
@@ -1192,13 +1187,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
- topicsNotInPreferredReplica =
- topicAndPartitionsForBroker.filter {
- case(topicPartition, replicas) => {
- controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
- controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
- }
- }
+ topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+ controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+ controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+ }
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
@@ -1208,18 +1200,16 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- topicsNotInPreferredReplica.foreach {
- case(topicPartition, replicas) => {
- inLock(controllerContext.controllerLock) {
- // do this check only if the broker is live and there are no partitions being reassigned currently
- // and preferred replica election is not in progress
- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.isEmpty &&
- controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
- !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
- controllerContext.allTopics.contains(topicPartition.topic)) {
- onPreferredReplicaElection(Set(topicPartition), true)
- }
+ topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+ inLock(controllerContext.controllerLock) {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.isEmpty &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
+ !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+ controllerContext.allTopics.contains(topicPartition.topic)) {
+ onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
@@ -1373,7 +1363,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
- val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
+ val (jsonOpt, _) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
if (jsonOpt.isDefined) {
val json = Json.parseFull(jsonOpt.get)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 32bf4da..ee94b46 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -238,7 +238,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* zookeeper
*/
private def initializePartitionState() {
- for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+ for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
@@ -297,7 +297,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// read the controller epoch
val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
topicAndPartition.partition).get
@@ -357,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, replicas)
} catch {
- case lenne: LeaderElectionNotNeededException => // swallow
+ case _: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
@@ -430,8 +430,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
- if(newTopics.nonEmpty)
- controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
+ if (newTopics.nonEmpty)
+ controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
@@ -522,7 +522,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if (partitionsToBeAdded.nonEmpty) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
- controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+ controller.onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d4e9bb4..03887ae 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -246,7 +246,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
- case Some(currLeaderIsrAndControllerEpoch) =>
+ case Some(_) =>
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 98057dd..8e5f3a1 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -342,8 +342,8 @@ class TopicDeletionManager(controller: KafkaController,
*@param replicasForTopicsToBeDeleted
*/
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
- replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
- var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+ replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
+ val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 79d4411..e0c8e65 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -265,7 +265,7 @@ class GroupMetadataManager(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
// first filter out partitions with offset metadata size exceeding limit
- val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
+ val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f29cde7..6b57696 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -290,7 +290,7 @@ class Log(val dir: File,
try {
curr.recover(config.maxMessageSize)
} catch {
- case e: InvalidOffsetException =>
+ case _: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
@@ -627,7 +627,7 @@ class Log(val dir: File,
val fetchDataInfo = read(offset, 1)
fetchDataInfo.fetchOffsetMetadata
} catch {
- case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+ case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bb8a89a..34b0dbf 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -240,7 +240,7 @@ class LogCleaner(val config: CleanerConfig,
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
- case pe: LogCleaningAbortedException => // task can be aborted, let it go.
+ case _: LogCleaningAbortedException => // task can be aborted, let it go.
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b3e6e72..b808348 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -112,12 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
*/
def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
inLock(lock) {
- val toClean = logs.filterNot {
- case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
- }.filter {
- case (topicAndPartition, log) => isCompactAndDelete(log)
+ val toClean = logs.filter { case (topicAndPartition, log) =>
+ !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
}
- toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
+ toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
toClean
}
@@ -317,4 +315,4 @@ private[log] object LogCleanerManager extends Logging {
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
-}
\ No newline at end of file
+}
[3/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1ef91b9..850b0e0 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -110,7 +110,7 @@ object ByteBufferMessageSet {
while (true)
innerMessageAndOffsets.add(readMessageFromStream(compressed))
} catch {
- case eofe: EOFException =>
+ case _: EOFException =>
// we don't do anything at all here, because the finally
// will close the compressed input stream, and we simply
// want to return the innerMessageAndOffsets
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7daab67..13b57e3 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -158,23 +158,16 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
)
private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
- val filteredTags = tags
- .filter { case (tagKey, tagValue) => tagValue != ""}
+ val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
if (filteredTags.nonEmpty) {
- val tagsString = filteredTags
- .map { case (key, value) => "%s=%s".format(key, value)}
- .mkString(",")
-
+ val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
Some(tagsString)
}
- else {
- None
- }
+ else None
}
private def toScope(tags: collection.Map[String, String]): Option[String] = {
- val filteredTags = tags
- .filter { case (tagKey, tagValue) => tagValue != ""}
+ val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
if (filteredTags.nonEmpty) {
// convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
val tagsString = filteredTags
@@ -184,9 +177,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
Some(tagsString)
}
- else {
- None
- }
+ else None
}
def removeAllConsumerMetrics(clientId: String) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 5408e0d..0f10577 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -82,7 +82,7 @@ class BlockingChannel( val host: String,
connectTimeoutMs))
} catch {
- case e: Throwable => disconnect()
+ case _: Throwable => disconnect()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 6b10e51..f793811 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -24,8 +24,6 @@ import org.apache.kafka.common.utils.Utils
@deprecated("This class has been deprecated and will be removed in a future release. " +
"It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
- private val random = new java.util.Random
-
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index c2f95ea..2d2bfdb 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -104,7 +104,7 @@ class Producer[K,V](val config: ProducerConfig,
}
}
catch {
- case e: InterruptedException =>
+ case _: InterruptedException =>
false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 797c77b..17d09ce 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -25,7 +25,7 @@ object Resource {
def fromString(str: String): Resource = {
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
- case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+ case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 72f79d5..5cfdcd6 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -316,13 +316,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
try {
zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
try {
debug(s"Node $path does not exist, attempting to create it.")
zkUtils.createPersistentPath(path, data)
(true, 0)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
debug(s"Failed to create node for $path because it already exists.")
(false, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index d87a8cf..5e584ab 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -97,7 +97,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
mapLock synchronized {
- for ((key, fetcher) <- fetcherThreadMap)
+ for (fetcher <- fetcherThreadMap.values)
fetcher.removePartitions(partitions)
}
info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8cb2270..325c7af 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -127,7 +127,7 @@ class AdminManager(val config: KafkaConfig,
AdminUtils.deleteTopic(zkUtils, topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
- case e: TopicAlreadyMarkedForDeletionException =>
+ case _: TopicAlreadyMarkedForDeletionException =>
// swallow the exception, and still track deletion allowing multiple calls to wait for deletion
DeleteTopicMetadata(topic, Errors.NONE)
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 00e5d0c..cc2c4cd 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -65,7 +65,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
throw new IOException("Unrecognized version of the server meta.properties file: " + version)
}
} catch {
- case e: FileNotFoundException =>
+ case _: FileNotFoundException =>
warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
None
case e1: Exception =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c4472c6..0c7c26b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -183,7 +183,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// trigger the callback immediately if quota is not violated
callback(0)
} catch {
- case qve: QuotaViolationException =>
+ case _: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
@@ -412,9 +412,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
(sanitizedUser, clientId) match {
- case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+ case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+ case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+ case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
case (None, None) =>
}
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 2feeae8..4bf04e6 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -112,10 +112,10 @@ class DelayedFetch(delayMs: Long,
}
}
} catch {
- case utpe: UnknownTopicOrPartitionException => // Case B
+ case _: UnknownTopicOrPartitionException => // Case B
debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
return forceComplete()
- case nle: NotLeaderForPartitionException => // Case A
+ case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
return forceComplete()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index b31d838..2e9e714 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -88,7 +88,6 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
- private var lastExecutedChange = -1L
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
@@ -106,7 +105,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
"Supported versions are 1 and 2.")
}
- case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+ case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
"{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
" Received: " + json)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1b723f..c6c8dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -755,7 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
} catch {
- case e: TopicExistsException => // let it go, possibly another broker created this topic
+ case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 117899b..0ae9124 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -43,7 +43,6 @@ class KafkaHealthcheck(brokerId: Int,
rack: Option[String],
interBrokerProtocolVersion: ApiVersion) extends Logging {
- private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
private[server] val sessionExpireListener = new SessionExpireListener
def startup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index df46336..a39fe49 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -111,7 +111,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
}
} catch {
- case e: NumberFormatException => throw malformedLineException(line)
+ case _: NumberFormatException => throw malformedLineException(line)
} finally {
reader.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32bc660..b43695a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,20 +223,20 @@ class ReplicaManager(val config: KafkaConfig,
deletePartition.toString, topic, partitionId))
val errorCode = Errors.NONE.code
getPartition(topic, partitionId) match {
- case Some(partition) =>
- if(deletePartition) {
+ case Some(_) =>
+ if (deletePartition) {
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null) {
removedPartition.delete() // this will delete the local log
val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t }
if (!topicHasPartitions)
- BrokerTopicStats.removeMetrics(topic)
+ BrokerTopicStats.removeMetrics(topic)
}
}
case None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
- if(deletePartition) {
+ if (deletePartition) {
val topicAndPartition = TopicAndPartition(topic, partitionId)
if(logManager.getLog(topicAndPartition).isDefined) {
@@ -358,10 +358,9 @@ class ReplicaManager(val config: KafkaConfig,
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
- val responseStatus = messagesPerPartition.map {
- case (topicAndPartition, messageSet) =>
- topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
- LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
+ val responseStatus = messagesPerPartition.map { case (topicAndPartition, _) =>
+ topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+ LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
}
responseCallback(responseStatus)
}
@@ -657,11 +656,9 @@ class ReplicaManager(val config: KafkaConfig,
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Short]
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
- leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
- }
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
@@ -694,7 +691,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
+ val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
stateInfo.leader == config.brokerId
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
@@ -830,7 +827,7 @@ class ReplicaManager(val config: KafkaConfig,
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
- case Some(leaderBroker) =>
+ case Some(_) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 5c487bf..bb6caa0 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -83,7 +83,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
leaderId = brokerId
onBecomingLeader()
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56ae0c9..eea66f8 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -119,11 +119,11 @@ object ConsoleConsumer extends Logging {
val msg: BaseConsumerRecord = try {
consumer.receive()
} catch {
- case nse: StreamEndException =>
+ case _: StreamEndException =>
trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
// Consumer is already closed
return
- case nse: WakeupException =>
+ case _: WakeupException =>
trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
// Consumer will be closed
return
@@ -358,7 +358,7 @@ object ConsoleConsumer extends Logging {
val offset =
try offsetString.toLong
catch {
- case e: NumberFormatException => invalidOffset(offsetString)
+ case _: NumberFormatException => invalidOffset(offsetString)
}
if (offset < 0) invalidOffset(offsetString)
offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 63a04c9..2b3f56d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -71,7 +71,7 @@ object ConsumerPerformance {
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
- for ((topic, streamList) <- topicMessageStreams)
+ for (streamList <- topicMessageStreams.values)
for (i <- 0 until streamList.length)
threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f4f7acf..c299676 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -414,12 +414,10 @@ object DumpLogSegments {
}
}
- shallowOffsetNotFound.foreach {
- case (fileName, listOfShallowOffsetNotFound) => {
- System.err.println("The following indexed offsets are not found in the log.")
- listOfShallowOffsetNotFound.foreach(m => {
- System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
- })
+ shallowOffsetNotFound.values.foreach { listOfShallowOffsetNotFound =>
+ System.err.println("The following indexed offsets are not found in the log.")
+ listOfShallowOffsetNotFound.foreach { case (indexedOffset, logOffset) =>
+ System.err.println(s"Indexed offset: $indexedOffset, found log offset: $logOffset")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 1c92088..9aaad3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -122,8 +122,7 @@ object EndToEndLatency {
//Check we only got the one message
if (recordIter.hasNext) {
- var count = 1
- for (elem <- recordIter) count += 1
+ val count = 1 + recordIter.size
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0b..1a6ba69 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -375,7 +375,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
mirrorMakerConsumer.commit()
throw e
- case e: CommitFailedException =>
+ case _: CommitFailedException =>
warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " +
"another instance. If you see this regularly, it could indicate that you need to either increase " +
s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " +
@@ -435,9 +435,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
maybeFlushAndCommitOffsets()
}
} catch {
- case cte: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
trace("Caught ConsumerTimeoutException, continue iteration.")
- case we: WakeupException =>
+ case _: WakeupException =>
trace("Caught ConsumerWakeupException, continue iteration.")
}
maybeFlushAndCommitOffsets()
@@ -485,7 +485,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
mirrorMakerConsumer.stop()
}
catch {
- case ie: InterruptedException =>
+ case _: InterruptedException =>
warn("Interrupt during shutdown of the mirror maker thread")
}
}
@@ -495,7 +495,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
shutdownLatch.await()
info("Mirror maker thread shutdown complete")
} catch {
- case ie: InterruptedException =>
+ case _: InterruptedException =>
warn("Shutdown of the mirror maker thread interrupted")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index d88ec41..4e2c7ef 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -32,9 +32,6 @@ object ReplayLogProducer extends Logging {
def main(args: Array[String]) {
val config = new Config(args)
- val executor = Executors.newFixedThreadPool(config.numThreads)
- val allDone = new CountDownLatch(config.numThreads)
-
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
Thread.sleep(500)
@@ -51,7 +48,7 @@ object ReplayLogProducer extends Logging {
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
var threadList = List[ZKConsumerThread]()
- for ((topic, streamList) <- topicMessageStreams)
+ for (streamList <- topicMessageStreams.values)
for (stream <- streamList)
threadList ::= new ZKConsumerThread(config, stream)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 9a059df..01d3aa8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -107,7 +107,7 @@ object ReplicaVerificationTool extends Logging {
Pattern.compile(regex)
}
catch {
- case e: PatternSyntaxException =>
+ case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
@@ -151,14 +151,13 @@ object ReplicaVerificationTool extends Logging {
topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId))
.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
- val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(
- topicMetadataResponse =>
- topicMetadataResponse.partitionsMetadata.map(
- partitionMetadata =>
- (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))
- ).groupBy(_._2)
- .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map {
- case(topicAndPartition, leaderId) => topicAndPartition })
+ val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
+ topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
+ (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+ }
+ }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
+ topicAndPartition
+ })
debug("Leaders per broker: " + leadersPerBroker)
val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
@@ -236,8 +235,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
}
private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
- offsetResponse.partitionErrorAndOffsets.filter {
- case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
+ offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
+ partitionOffsetsResponse.error != Errors.NONE.code
}.mkString
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 1c059bb..99b5aae 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -271,8 +271,8 @@ object CoreUtils extends Logging {
*/
def duplicates[T](s: Traversable[T]): Iterable[T] = {
s.groupBy(identity)
- .map{ case (k,l) => (k,l.size)}
- .filter{ case (k,l) => l > 1 }
+ .map { case (k, l) => (k, l.size)}
+ .filter { case (_, l) => l > 1 }
.keys
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index b43b4b1..896c300 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -52,7 +52,7 @@ class FileLock(val file: File) extends Logging {
flock = channel.tryLock()
flock != null
} catch {
- case e: OverlappingFileLockException => false
+ case _: OverlappingFileLockException => false
}
}
}
@@ -77,4 +77,4 @@ class FileLock(val file: File) extends Logging {
channel.close()
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index aa120ab..5d2549e 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -61,12 +61,10 @@ object Mx4jLoader extends Logging {
true
}
catch {
- case e: ClassNotFoundException => {
+ case _: ClassNotFoundException =>
info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
- }
- case e: Throwable => {
+ case e: Throwable =>
warn("Could not start register mbean in JMX", e)
- }
}
false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92..369bb23 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -58,13 +58,13 @@ object ReplicationUtils extends Logging {
(expectedLeader,writtenLeader) match {
case (Some(expectedLeader),Some(writtenLeader)) =>
if(expectedLeader == writtenLeader)
- return (true,writtenStat.getVersion())
+ return (true, writtenStat.getVersion())
case _ =>
}
case None =>
}
} catch {
- case e1: Exception =>
+ case _: Exception =>
}
(false,-1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index f57245f..9600b0a 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -181,7 +181,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
/**
* Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
*/
- def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
+ def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
try {
val m = CoreUtils.parseCsvMap(getString(name, ""))
m.foreach {
@@ -208,7 +208,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
CompressionCodec.getCompressionCodec(prop.toInt)
}
catch {
- case nfe: NumberFormatException =>
+ case _: NumberFormatException =>
CompressionCodec.getCompressionCodec(prop)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 80a9f1a..787cb8f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -237,7 +237,7 @@ class ZkUtils(val zkClient: ZkClient,
createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
proposedClusterId
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
}
}
@@ -389,7 +389,7 @@ class ZkUtils(val zkClient: ZkClient,
isSecure)
zkCheckedEphemeral.create()
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+ ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+ "else you have shutdown this broker and restarted it faster than the zookeeper "
@@ -445,7 +445,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createEphemeral(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
@@ -465,7 +465,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
storedData = readData(path)._1
} catch {
- case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+ case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -484,7 +484,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createPersistent(zkClient, path, data, acls)
}
@@ -503,12 +503,12 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
zkClient.writeData(path, data)
}
}
@@ -573,7 +573,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
@@ -583,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.delete(path)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
@@ -599,7 +599,7 @@ class ZkUtils(val zkClient: ZkClient,
zkClient.delete(path, expectedVersion)
true
} catch {
- case e: ZkBadVersionException => false
+ case _: ZkBadVersionException => false
}
}
@@ -607,7 +607,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.deleteRecursive(path)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
}
@@ -624,7 +624,7 @@ class ZkUtils(val zkClient: ZkClient,
val dataAndStat = try {
(Some(zkClient.readData(path, stat)), stat)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
(None, stat)
}
dataAndStat
@@ -642,7 +642,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.getChildren(path)
} catch {
- case e: ZkNoNodeException => Nil
+ case _: ZkNoNodeException => Nil
}
}
@@ -754,7 +754,7 @@ class ZkUtils(val zkClient: ZkClient,
updatePersistentPath(zkPath, jsonData)
debug("Updated partition reassignment path with %s".format(jsonData))
} catch {
- case nne: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createPersistentPath(zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -835,7 +835,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
writeToZk
} catch {
- case e1: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
makeSurePersistentPathExists(path)
writeToZk
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ce91a30..f13f59f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -111,7 +111,6 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConsumerGroupForNonExistentGroup() {
val nonExistentGroup = "non" + groupId
- val sum = client.describeConsumerGroup(nonExistentGroup).consumers
assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 55e8e4f..8502ae0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -149,11 +149,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
- for (i <- 0 until producerCount)
+ for (_ <- 0 until producerCount)
producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
maxBlockMs = 3000,
acks = 1)
- for (i <- 0 until consumerCount)
+ for (_ <- 0 until consumerCount)
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
@@ -339,7 +339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRecords(numRecords, tp)
fail("should have thrown exception")
} catch {
- case e: TimeoutException => //expected
+ case _: TimeoutException => //expected
}
}
@@ -517,7 +517,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
} catch {
- case e: TopicAuthorizationException => //expected
+ case _: TopicAuthorizationException => //expected
}
}
@@ -595,7 +595,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
} catch {
- case e: TopicAuthorizationException => //expected
+ case _: TopicAuthorizationException => //expected
} finally consumer.close()
}
@@ -813,11 +813,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
}
- private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
- servers.head.apis.authorizer.get.removeAcls(acls, resource)
- TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
- }
-
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 102b7cf..732b99f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -280,7 +280,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
try {
consumer.poll(50)
} catch {
- case e: WakeupException => // ignore for shutdown
+ case _: WakeupException => // ignore for shutdown
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 816f36a..b5aaaf4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -142,12 +142,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
- case iae: IllegalArgumentException => // this is ok
- case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+ case _: IllegalArgumentException => // this is ok
}
// non-blocking send a list of records
- for (i <- 1 to numRecords)
+ for (_ <- 1 to numRecords)
producer.send(record0, callback)
// check that all messages have been acked via offset
@@ -234,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
- for (i <- 1 to numRecords)
+ for (_ <- 1 to numRecords)
producer.send(record0)
val response0 = producer.send(record0)
@@ -328,8 +327,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
fail("Should not allow sending a record to a partition not present in the metadata")
} catch {
- case ke: KafkaException => // this is ok
- case e: Throwable => fail("Only expecting KafkaException", e)
+ case _: KafkaException => // this is ok
}
AdminUtils.addPartitions(zkUtils, topic, 2)
@@ -370,8 +368,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
- for (i <- 0 until 50) {
- val responses = (0 until numRecords) map (i => producer.send(record))
+ for (_ <- 0 until 50) {
+ val responses = (0 until numRecords) map (_ => producer.send(record))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.flush()
assertTrue("All requests are complete.", responses.forall(_.isDone()))
@@ -389,15 +387,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// create topic
val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val leader0 = leaders(0)
- val leader1 = leaders(1)
// create record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
// Test closing from caller thread.
- for (i <- 0 until 50) {
+ for (_ <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
- val responses = (0 until numRecords) map (i => producer.send(record0))
+ val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(0, TimeUnit.MILLISECONDS)
responses.foreach { future =>
@@ -436,7 +433,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// Trigger another batch in accumulator before close the producer. These messages should
// not be sent.
if (sendRecords)
- (0 until numRecords) foreach (i => producer.send(record))
+ (0 until numRecords) foreach (_ => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
// Test close with non zero timeout. Should not block at all.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4e6c740..479e749 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -276,7 +276,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
AclCommand.main(deleteDescribeAclArgs)
AclCommand.main(deleteWriteAclArgs)
- servers.foreach { s =>
+ servers.foreach { _ =>
TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index b26b242..d15a01d 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
object FixedPortTestUtils {
def choosePorts(count: Int): Seq[Int] = {
try {
- val sockets = (0 until count).map(i => new ServerSocket(0))
+ val sockets = (0 until count).map(_ => new ServerSocket(0))
val ports = sockets.map(_.getLocalPort())
sockets.foreach(_.close())
ports
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ca020a6..83280dc 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -65,9 +65,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
- for (i <- 0 until producerCount)
+ for (_ <- 0 until producerCount)
producers += createNewProducer
- for (i <- 0 until consumerCount) {
+ for (_ <- 0 until consumerCount) {
consumers += createNewConsumer
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index d18dc3a..aefe5bd 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -782,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create a group of consumers, subscribe the consumers to all the topics and start polling
// for the topic partition assignment
- val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
+ val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
try {
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
@@ -862,10 +862,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
testProducer.send(null, null)
fail("Should not allow sending a null record")
} catch {
- case e: Throwable => {
+ case _: Throwable =>
assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue())
assertEquals("Interceptor should not receive metadata with an exception when record is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
- }
}
// create consumer with interceptor
@@ -1222,7 +1221,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
TestUtils.waitUntilTrue(() => {
- val records = consumer0.poll(50)
+ consumer0.poll(50)
consumer0.assignment() == newAssignment.asJava
}, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
@@ -1335,7 +1334,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
assertTrue(consumerCount <= subscriptions.size)
val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
- for (i <- 0 until consumerCount)
+ for (_ <- 0 until consumerCount)
consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
consumers ++= consumerGroup
@@ -1364,7 +1363,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): Unit = {
assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
- for (i <- 0 until numOfConsumersToAdd) {
+ for (_ <- 0 until numOfConsumersToAdd) {
val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
consumerGroup += newConsumer
consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
@@ -1415,7 +1414,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
rebalanceListener: ConsumerRebalanceListener): Unit = {
consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
TestUtils.waitUntilTrue(() => {
- val records = consumer.poll(50)
+ consumer.poll(50)
consumer.assignment() == subscriptions.asJava
}, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 734eb66..a75e7c7 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -39,7 +39,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
createNewProducerWithNoSerializer(brokerList)
fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
} catch {
- case ce : ConfigException => // this is ok
+ case _ : ConfigException => // this is ok
}
// create a producer with explicit serializers should succeed
@@ -67,7 +67,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
producer.send(record5)
fail("Should have gotten a SerializationException")
} catch {
- case se: SerializationException => // this is ok
+ case _: SerializationException => // this is ok
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5994a1d..8d676d1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -88,7 +88,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
scheduler.start
// rolling bounce brokers
- for (i <- 0 until numServers) {
+ for (_ <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
@@ -143,7 +143,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
futures.map(_.get)
sent += numRecords
} catch {
- case e : Exception => failed = true
+ case _ : Exception => failed = true
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 14807bc..cebfb04 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -102,7 +102,6 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
private val orgName = config.getProperty(MiniKdc.OrgName)
private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
- private val dnString = s"dc=$orgName,dc=$orgDomain"
private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
private val krb5conf = new File(workDir, "krb5.conf")
@@ -163,7 +162,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
partition.setId(orgName)
partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
- val dn = new Dn(dnString)
+ val dn = new Dn(s"dc=$orgName,dc=$orgDomain")
partition.setSuffixDn(dn)
ds.addPartition(partition)
@@ -207,7 +206,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
val kerberosConfig = new KerberosConfig
kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
- kerberosConfig.setSearchBaseDn(dnString)
+ kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain")
kerberosConfig.setPaEncTimestampRequired(false)
kdc = new KdcServer(kerberosConfig)
kdc.setDirectoryService(ds)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 6556100..51f02d1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -301,7 +301,7 @@ object TestLogCleaning {
consumedWriter.newLine()
}
} catch {
- case e: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
}
}
consumedWriter.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8adc7e2..f5cee0c 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -69,7 +69,6 @@ object StressTestLog {
abstract class WorkerThread extends Thread {
override def run() {
try {
- var offset = 0
while(running.get)
work()
} catch {
@@ -107,7 +106,7 @@ object StressTestLog {
case _ =>
}
} catch {
- case e: OffsetOutOfRangeException => // this is okay
+ case _: OffsetOutOfRangeException => // this is okay
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestCrcPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
index 0c1e1ad..daeecbd 100755
--- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
@@ -18,7 +18,6 @@ package kafka.log
import java.util.Random
import kafka.message._
-import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
object TestCrcPerformance {
@@ -28,21 +27,18 @@ object TestCrcPerformance {
Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size")
val numMessages = args(0).toInt
val messageSize = args(1).toInt
- //val numMessages = 100000000
- //val messageSize = 32
- val dir = TestUtils.tempDir()
val content = new Array[Byte](messageSize)
new Random(1).nextBytes(content)
// create message test
val start = System.nanoTime
- for(i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages)
new Message(content)
- }
- val ellapsed = System.nanoTime - start
- println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, ellapsed/(1000.0*1000.0*1000.0),
- ellapsed / numMessages.toDouble))
+
+ val elapsed = System.nanoTime - start
+ println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, elapsed / (1000.0*1000.0*1000.0),
+ elapsed / numMessages.toDouble))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index ab807a1..72c7f28 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -33,12 +33,13 @@ object TestKafkaAppender extends Logging {
try {
PropertyConfigurator.configure(args(0))
} catch {
- case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
- e.printStackTrace()
- System.exit(1)
+ case e: Exception =>
+ System.err.println("KafkaAppender could not be initialized ! Exiting..")
+ e.printStackTrace()
+ System.exit(1)
}
- for(i <- 1 to 10)
+ for (_ <- 1 to 10)
info("test")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index db281bf..6bd8e4f 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -101,7 +101,8 @@ object TestLinearWriteSpeed {
val rand = new Random
rand.nextBytes(buffer.array)
val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
- val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*)
+ val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec,
+ messages = (0 until numMessages).map(_ => new Message(new Array[Byte](messageSize))): _*)
val writables = new Array[Writable](numFiles)
val scheduler = new KafkaScheduler(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9445191..9db2ffd 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -94,7 +94,7 @@ object TestOffsetManager {
offset += 1
}
catch {
- case e1: ClosedByInterruptException =>
+ case _: ClosedByInterruptException =>
offsetsChannel.disconnect()
case e2: IOException =>
println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, groupId, e2))
@@ -158,7 +158,7 @@ object TestOffsetManager {
}
}
catch {
- case e1: ClosedByInterruptException =>
+ case _: ClosedByInterruptException =>
channel.disconnect()
channels.remove(coordinatorId)
case e2: IOException =>
@@ -168,7 +168,7 @@ object TestOffsetManager {
}
}
catch {
- case e: IOException =>
+ case _: IOException =>
println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
metadataChannel.disconnect()
println("Creating new query channel.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..6ccac29 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -104,7 +104,7 @@ object TestPurgatoryPerformance {
val latch = new CountDownLatch(numRequests)
val start = System.currentTimeMillis
val rand = new Random()
- val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
+ val keys = (0 until numKeys).map(_ => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
@volatile var requestArrivalTime = start
@volatile var end = 0L
val generator = new Runnable {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 763e4ec..0f846e1 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -69,8 +69,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
AdminUtils.addPartitions(zkUtils, "Blah", 1)
fail("Topic should not exist")
} catch {
- case e: AdminOperationException => //this is good
- case e2: Throwable => throw e2
+ case _: AdminOperationException => //this is good
}
}
@@ -80,8 +79,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
} catch {
- case e: AdminOperationException => //this is good
- case e2: Throwable => throw e2
+ case _: AdminOperationException => //this is good
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index ddfbb51..ff86693 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -250,7 +250,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.parseEntity(opts)
fail("Did not fail with invalid argument list")
} catch {
- case e: IllegalArgumentException => // expected exception
+ case _: IllegalArgumentException => // expected exception
}
}
@@ -315,7 +315,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter", "--add-config", "a=b,c=d")
fail("Did not fail with invalid client-id")
} catch {
- case e: InvalidConfigException => // expected
+ case _: InvalidConfigException => // expected
}
checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ccb3618..d1fcbc0 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -164,7 +164,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
@Test
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
@@ -208,7 +207,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
AdminUtils.deleteTopic(zkUtils, "test2")
fail("Expected UnknownTopicOrPartitionException")
} catch {
- case e: UnknownTopicOrPartitionException => // expected exception
+ case _: UnknownTopicOrPartitionException => // expected exception
}
// verify delete topic path for test2 is removed from zookeeper
TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
@@ -270,7 +269,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
- case e: TopicAlreadyMarkedForDeletionException => // expected exception
+ case _: TopicAlreadyMarkedForDeletionException => // expected exception
}
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -300,7 +299,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
var counter = 0
- for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+ for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
counter += 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3691919..39bcb7a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -87,7 +87,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (_, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 1 &&
assignments.get.filter(_.group == group).head.consumerId.isDefined
@@ -113,7 +113,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (_, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 2 &&
assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 &&
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 2a3724e..90a354e 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -45,7 +45,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging wi
kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
- val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+ val (proposedAssignment, _) = ReassignPartitionsCommand.generateAssignment(zkUtils,
rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
val assignment = proposedAssignment map { case (topicPartition, replicas) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
index fff3e7b..b71b00b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
@@ -34,7 +34,7 @@ class ApiUtilsTest extends JUnitSuite {
@Test
def testShortStringNonASCII() {
// Random-length strings
- for(i <- 0 to 100) {
+ for(_ <- 0 to 100) {
// Since we're using UTF-8 encoding, each encoded byte will be one to four bytes long
val s: String = ApiUtilsTest.rnd.nextString(math.abs(ApiUtilsTest.rnd.nextInt()) % (Short.MaxValue / 4))
val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
@@ -47,7 +47,7 @@ class ApiUtilsTest extends JUnitSuite {
@Test
def testShortStringASCII() {
// Random-length strings
- for(i <- 0 to 100) {
+ for(_ <- 0 to 100) {
val s: String = TestUtils.randomString(math.abs(ApiUtilsTest.rnd.nextInt()) % Short.MaxValue)
val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
ApiUtils.writeShortString(bb, s)
@@ -68,17 +68,13 @@ class ApiUtilsTest extends JUnitSuite {
ApiUtils.shortStringLength(s2)
fail
} catch {
- case e: KafkaException => {
- // ok
- }
+ case _: KafkaException => // ok
}
try {
ApiUtils.writeShortString(bb, s2)
fail
} catch {
- case e: KafkaException => {
- // ok
- }
+ case _: KafkaException => // ok
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index f8fbae7..16fe788 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,7 +23,6 @@ import kafka.common._
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.utils.SystemTime
-import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
import java.nio.ByteBuffer
@@ -37,10 +36,6 @@ import org.junit.Assert._
object SerializationTestUtils {
private val topic1 = "test1"
private val topic2 = "test2"
- private val leader1 = 0
- private val isr1 = List(0, 1, 2)
- private val leader2 = 0
- private val isr2 = List(0, 2, 3)
private val partitionDataFetchResponse0 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
private val partitionDataFetchResponse1 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes)))
private val partitionDataFetchResponse2 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes)))
@@ -84,37 +79,6 @@ object SerializationTestUtils {
private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
- private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
-
- private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0)
- private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1)
- private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2)
- private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3)
- private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
- private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
- private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
-
- private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
- private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
- private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
- private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-
- private val leaderIsrAndControllerEpoch0 = new LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch1 = new LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0)
-
- private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet)
- private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet)
- private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet)
- private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet)
-
- private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map(
- TopicAndPartition(topic1,0) -> partitionStateInfo0,
- TopicAndPartition(topic1,1) -> partitionStateInfo1,
- TopicAndPartition(topic1,2) -> partitionStateInfo2,
- TopicAndPartition(topic1,3) -> partitionStateInfo3
- )
def createTestProducerRequest: ProducerRequest = {
new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 26154f2..2d20b1e 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -40,7 +40,7 @@ class ConfigTest {
fail("Should throw InvalidClientIdException.")
}
catch {
- case e: InvalidConfigException => "This is good."
+ case _: InvalidConfigException => // This is good
}
}
@@ -51,7 +51,7 @@ class ConfigTest {
ProducerConfig.validateClientId(validClientIds(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}
@@ -70,7 +70,7 @@ class ConfigTest {
fail("Should throw InvalidGroupIdException.")
}
catch {
- case e: InvalidConfigException => "This is good."
+ case _: InvalidConfigException => // This is good
}
}
@@ -81,7 +81,7 @@ class ConfigTest {
ConsumerConfig.validateGroupId(validGroupIds(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 66549af..39eb315 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -28,7 +28,7 @@ class TopicTest {
val invalidTopicNames = new ArrayBuffer[String]()
invalidTopicNames += ("", ".", "..")
var longName = "ATCG"
- for (i <- 1 to 6)
+ for (_ <- 1 to 6)
longName += longName
invalidTopicNames += longName
invalidTopicNames += longName.drop(6)
@@ -43,7 +43,7 @@ class TopicTest {
fail("Should throw InvalidTopicException.")
}
catch {
- case e: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
+ case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
}
}
@@ -54,7 +54,7 @@ class TopicTest {
Topic.validate(validTopicNames(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}