You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/06 07:46:08 UTC
[1/3] kafka git commit: KAFKA-3771; Improving Kafka core code
Repository: kafka
Updated Branches:
refs/heads/trunk 2c7fae0a4 -> 79aaf19f2
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d37de76..40ad0f3 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -55,8 +55,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
val message = "hello"
var producer: KafkaProducer[Integer, String] = null
- def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
- def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename))
+ def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename))
var servers = Seq.empty[KafkaServer]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
@@ -95,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
producer.close()
for (server <- servers) {
server.shutdown()
- Utils.delete(new File(server.config.logDirs(0)))
+ Utils.delete(new File(server.config.logDirs.head))
}
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 312edd4..e0b6db4 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -177,8 +177,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
for(logDir <- logDirs) {
- val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
- new File(logDir + File.separator + brokerMetaPropsFile))).read()
+ val brokerMetadataOpt = new BrokerMetadataCheckpoint(
+ new File(logDir + File.separator + brokerMetaPropsFile)).read()
brokerMetadataOpt match {
case Some(brokerMetadata: BrokerMetadata) =>
if (brokerMetadata.brokerId != brokerId) return false
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 689b70b..7741698 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -105,7 +105,7 @@ class SimpleFetchTest {
val partition = replicaManager.getOrCreatePartition(topic, partitionId)
// create the leader replica with the local log
- val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log))
+ val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW)
partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
@@ -144,15 +144,15 @@ class SimpleFetchTest {
*/
@Test
def testReadFromLog() {
- val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
- val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();
+ val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()
+ val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
- assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count());
- assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count());
+ assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
+ assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 37d334b..741eec9 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -69,10 +69,10 @@ class ConsoleProducerTest {
@Test
def testParseKeyProp(): Unit = {
val config = new ConsoleProducer.ProducerConfig(validArgs)
- val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader];
+ val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader]
reader.init(System.in,ConsoleProducer.getReaderProps(config))
assert(reader.keySeparator == "#")
- assert(reader.parseKey == true)
+ assert(reader.parseKey)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
index 56f5905..6a40510 100644
--- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -22,7 +22,7 @@ import org.junit.{Test, After, Before}
class IteratorTemplateTest extends Assertions {
- val lst = (0 until 10)
+ val lst = 0 until 10
val iterator = new IteratorTemplate[Int]() {
var i = 0
override def makeNext() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 7c4b951..f39fa6b 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -51,7 +51,7 @@ object JaasTestUtils {
entries = Map(
"username" -> username,
"password" -> password
- ) ++ validUsers.map { case (user, pass) => (s"user_$user"-> pass)}
+ ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass }
)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 434c22a..e9dbbb1 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -56,7 +56,7 @@ class MockScheduler(val time: Time) extends Scheduler {
def tick() {
this synchronized {
val now = time.milliseconds
- while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+ while(tasks.nonEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
curr.fun()
@@ -78,7 +78,7 @@ class MockScheduler(val time: Time) extends Scheduler {
}
-case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] {
+case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long) extends Ordered[MockTask] {
def periodic = period >= 0
def compare(t: MockTask): Int = {
if(t.nextExecution == nextExecution)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockTime.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala
index 0858e04..21fb4d9 100644
--- a/core/src/test/scala/unit/kafka/utils/MockTime.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala
@@ -46,7 +46,7 @@ class MockTime(@volatile private var currentMs: Long) extends Time {
scheduler.tick()
}
- override def toString() = "MockTime(%d)".format(milliseconds)
+ override def toString = "MockTime(%d)".format(milliseconds)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7df87fc..b42a6ba 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -94,7 +94,7 @@ object TestUtils extends Logging {
val parentFile = new File(parent)
parentFile.mkdirs()
- org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-");
+ org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-")
}
/**
@@ -335,12 +335,12 @@ object TestUtils extends Logging {
// check if the expected iterator is longer
if (expected.hasNext) {
- var length1 = length;
+ var length1 = length
while (expected.hasNext) {
expected.next
length1 += 1
}
- assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
+ assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true)
}
// check if the actual iterator was longer
@@ -350,7 +350,7 @@ object TestUtils extends Logging {
actual.next
length2 += 1
}
- assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
+ assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true)
}
}
@@ -671,7 +671,7 @@ object TestUtils extends Logging {
try{
val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
var newLeaderAndIsr: LeaderAndIsr = null
- if(currentLeaderAndIsrOpt == None)
+ if(currentLeaderAndIsrOpt.isEmpty)
newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
else{
newLeaderAndIsr = currentLeaderAndIsrOpt.get
@@ -716,7 +716,7 @@ object TestUtils extends Logging {
} else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) {
trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l))
isLeaderElectedOrChanged = true
- } else if (!oldLeaderOpt.isDefined) {
+ } else if (oldLeaderOpt.isEmpty) {
trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition))
isLeaderElectedOrChanged = true
} else {
@@ -856,7 +856,7 @@ object TestUtils extends Logging {
// in sync replicas should not have any replica that is not in the new assigned replicas
val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
- phantomInSyncReplicas.size == 0)
+ phantomInSyncReplicas.isEmpty)
}
def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
@@ -1031,7 +1031,7 @@ object TestUtils extends Logging {
"Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() =>
- servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+ servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition).isEmpty)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
assertTrue("Replica logs not deleted after delete topic is complete",
@@ -1146,7 +1146,7 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0")
class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner {
def partition(data: Any, numPartitions: Int): Int = {
- (data.asInstanceOf[String].length % numPartitions)
+ data.asInstanceOf[String].length % numPartitions
}
}
[2/3] kafka git commit: KAFKA-3771; Improving Kafka core code
Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 8b523e7..1f148de 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -108,7 +108,7 @@ object StateChangeLogMerger extends Logging {
val fileNameIndex = regex.lastIndexOf('/') + 1
val dirName = if (fileNameIndex == 0) "." else regex.substring(0, fileNameIndex - 1)
val fileNameRegex = new Regex(regex.substring(fileNameIndex))
- files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName) != None).map(dirName + "/" + _.getName).toList
+ files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName).isDefined).map(dirName + "/" + _.getName).toList
}
if (options.has(topicOpt)) {
topic = options.valueOf(topicOpt)
@@ -141,9 +141,9 @@ object StateChangeLogMerger extends Logging {
if (!lineItr.isEmpty)
lines ::= lineItr
}
- if (!lines.isEmpty) pqueue.enqueue(lines:_*)
+ if (lines.nonEmpty) pqueue.enqueue(lines:_*)
- while (!pqueue.isEmpty) {
+ while (pqueue.nonEmpty) {
val lineItr = pqueue.dequeue()
output.write((lineItr.line + "\n").getBytes)
val nextLineItr = getNextLine(lineItr.itr)
@@ -182,7 +182,7 @@ object StateChangeLogMerger extends Logging {
class LineIterator(val line: String, val itr: Iterator[String]) {
def this() = this("", null)
- def isEmpty = (line == "" && itr == null)
+ def isEmpty = line == "" && itr == null
}
implicit object dateBasedOrdering extends Ordering[LineIterator] {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index 3077896..5f39402 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -89,7 +89,7 @@ object VerifyConsumerRebalance extends Logging {
info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
val partitionsWithOwners = zkUtils.getChildrenParentMayNotExist(topicDirs.consumerOwnerDir)
- if(partitionsWithOwners.size == 0) {
+ if(partitionsWithOwners.isEmpty) {
error("No owners for any partitions for topic " + topic)
rebalanceSucceeded = false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 5b6c59f..21658d3 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -258,7 +258,7 @@ object CoreUtils extends Logging {
* Per RFC4627, section 2.5, we're not technically required to
* encode the C1 codes, but we do to be safe.
*/
- case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
+ case c if (c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f') => "\\u%04x".format(c: Int)
case c => c
}.mkString
}
@@ -269,7 +269,7 @@ object CoreUtils extends Logging {
def duplicates[T](s: Traversable[T]): Iterable[T] = {
s.groupBy(identity)
.map{ case (k,l) => (k,l.size)}
- .filter{ case (k,l) => (l > 1) }
+ .filter{ case (k,l) => l > 1 }
.keys
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index fef9392..65758d8 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -29,7 +29,7 @@ object ToolsUtils {
hostPortData =>
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
- val isValid = !(validHostPort.isEmpty) && validHostPort.size == hostPorts.length
+ val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
if(!isValid)
CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 34cab87..f57245f 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -227,6 +227,6 @@ class VerifiableProperties(val props: Properties) extends Logging {
}
}
- override def toString(): String = props.toString
+ override def toString: String = props.toString
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1278a70..f02ab20 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -653,7 +653,7 @@ class ZkUtils(val zkClient: ZkClient,
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
- (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
+ topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
}
}
@@ -663,7 +663,7 @@ class ZkUtils(val zkClient: ZkClient,
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
- reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
+ reassignedPartitions.map(p => p._1 -> new ReassignedPartitionsContext(p._2))
case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
}
}
@@ -828,9 +828,9 @@ class ZkUtils(val zkClient: ZkClient,
val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
if(topics == null) Set.empty[TopicAndPartition]
else {
- topics.map { topic =>
+ topics.flatMap { topic =>
getChildren(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
- }.flatten.toSet
+ }.toSet
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 3d39475..891a72c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -63,25 +63,25 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testListGroups() {
- consumers(0).subscribe(List(topic))
+ consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
- consumers(0).poll(0)
- !consumers(0).assignment().isEmpty
+ consumers.head.poll(0)
+ !consumers.head.assignment().isEmpty
}, "Expected non-empty assignment")
val groups = client.listAllGroupsFlattened
assertFalse(groups.isEmpty)
- val group = groups(0)
+ val group = groups.head
assertEquals(groupId, group.groupId)
assertEquals("consumer", group.protocolType)
}
@Test
def testDescribeGroup() {
- consumers(0).subscribe(List(topic))
+ consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
- consumers(0).poll(0)
- !consumers(0).assignment().isEmpty
+ consumers.head.poll(0)
+ !consumers.head.assignment().isEmpty
}, "Expected non-empty assignment")
val group = client.describeGroup(groupId)
@@ -90,7 +90,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
assertEquals("Stable", group.state)
assertFalse(group.members.isEmpty)
- val member = group.members(0)
+ val member = group.members.head
assertEquals(clientId, member.clientId)
assertFalse(member.clientHost.isEmpty)
assertFalse(member.memberId.isEmpty)
@@ -98,10 +98,10 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConsumerGroup() {
- consumers(0).subscribe(List(topic))
+ consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
- consumers(0).poll(0)
- !consumers(0).assignment().isEmpty
+ consumers.head.poll(0)
+ !consumers.head.assignment().isEmpty
}, "Expected non-empty assignment")
val consumerSummaries = client.describeConsumerGroup(groupId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2d5900f..10e0bae 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -413,8 +413,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
try {
- this.consumers(0).assign(List(topicPartition).asJava)
- consumeRecords(this.consumers(0))
+ this.consumers.head.assign(List(topicPartition).asJava)
+ consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
@@ -425,7 +425,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
sendRecords(numRecords, topicPartition)
- consumeRecords(this.consumers(0), topic = newTopic, part = 0)
+ consumeRecords(this.consumers.head, topic = newTopic, part = 0)
}
@Test(expected = classOf[AuthorizationException])
@@ -505,7 +505,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
@Test
def testListOffsetsWithNoTopicAccess() {
val e = intercept[TopicAuthorizationException] {
- this.consumers.head.partitionsFor(topic);
+ this.consumers.head.partitionsFor(topic)
}
assertEquals(Set(topic), e.unauthorizedTopics().asScala)
}
@@ -513,7 +513,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
@Test
def testListOfsetsWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
- this.consumers.head.partitionsFor(topic);
+ this.consumers.head.partitionsFor(topic)
}
def removeAllAcls() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 23fcfa6..ea74d5d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -68,17 +68,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
val numRecords = 10000
sendRecords(numRecords)
- assertEquals(0, this.consumers(0).assignment.size)
- this.consumers(0).assign(List(tp).asJava)
- assertEquals(1, this.consumers(0).assignment.size)
+ assertEquals(0, this.consumers.head.assignment.size)
+ this.consumers.head.assign(List(tp).asJava)
+ assertEquals(1, this.consumers.head.assignment.size)
- this.consumers(0).seek(tp, 0)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0)
+ this.consumers.head.seek(tp, 0)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0)
// check async commit callbacks
val commitCallback = new CountConsumerCommitCallback()
- this.consumers(0).commitAsync(commitCallback)
- awaitCommitCallback(this.consumers(0), commitCallback)
+ this.consumers.head.commitAsync(commitCallback)
+ awaitCommitCallback(this.consumers.head, commitCallback)
}
@Test
@@ -132,28 +132,28 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
sendRecords(5, tp)
sendRecords(7, tp2)
- this.consumers(0).assign(List(tp, tp2).asJava)
+ this.consumers.head.assign(List(tp, tp2).asJava)
// Need to poll to join the group
- this.consumers(0).poll(50)
- val pos1 = this.consumers(0).position(tp)
- val pos2 = this.consumers(0).position(tp2)
- this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
- assertEquals(3, this.consumers(0).committed(tp).offset)
- assertNull(this.consumers(0).committed(tp2))
+ this.consumers.head.poll(50)
+ val pos1 = this.consumers.head.position(tp)
+ val pos2 = this.consumers.head.position(tp2)
+ this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+ assertEquals(3, this.consumers.head.committed(tp).offset)
+ assertNull(this.consumers.head.committed(tp2))
// Positions should not change
- assertEquals(pos1, this.consumers(0).position(tp))
- assertEquals(pos2, this.consumers(0).position(tp2))
- this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
- assertEquals(3, this.consumers(0).committed(tp).offset)
- assertEquals(5, this.consumers(0).committed(tp2).offset)
+ assertEquals(pos1, this.consumers.head.position(tp))
+ assertEquals(pos2, this.consumers.head.position(tp2))
+ this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+ assertEquals(3, this.consumers.head.committed(tp).offset)
+ assertEquals(5, this.consumers.head.committed(tp2).offset)
// Using async should pick up the committed changes after commit completes
val commitCallback = new CountConsumerCommitCallback()
- this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
- awaitCommitCallback(this.consumers(0), commitCallback)
- assertEquals(7, this.consumers(0).committed(tp2).offset)
+ this.consumers.head.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
+ awaitCommitCallback(this.consumers.head, commitCallback)
+ assertEquals(7, this.consumers.head.committed(tp2).offset)
}
@Test
@@ -194,10 +194,10 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
while (parts == null)
parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
assertEquals(1, parts.size)
- assertNotNull(parts(0).leader())
+ assertNotNull(parts.head.leader())
// shutdown the coordinator
- val coordinator = parts(0).leader().id()
+ val coordinator = parts.head.leader().id()
this.servers(coordinator).shutdown()
// this should cause another callback execution
@@ -269,7 +269,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
protected def sendRecords(numRecords: Int, tp: TopicPartition) {
- sendRecords(this.producers(0), numRecords, tp)
+ sendRecords(this.producers.head, numRecords, tp)
}
protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, tp: TopicPartition) {
@@ -416,7 +416,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
*/
def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
partitions: Set[TopicPartition]): Boolean = {
- val allNonEmptyAssignments = assignments forall (assignment => assignment.size > 0)
+ val allNonEmptyAssignments = assignments forall (assignment => assignment.nonEmpty)
if (!allNonEmptyAssignments) {
// at least one consumer got empty assignment
return false
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 0a2b49a..8eaf827 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -56,7 +56,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
- consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024 * 1024, "")
+ consumer1 = new SimpleConsumer("localhost", servers.head.boundPort(), 100, 1024 * 1024, "")
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024, "")
}
@@ -298,7 +298,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
// make sure the fetched messages also respect the partitioning and ordering
- val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+ val fetchResponse1 = if (leader1.get == configs.head.brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
@@ -307,7 +307,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
// TODO: also check topic and partition after they are added in the return messageSet
- for (i <- 0 to numRecords - 1) {
+ for (i <- 0 until numRecords) {
assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1), messageSet1(i).message)
assertEquals(i.toLong, messageSet1(i).offset)
}
@@ -386,7 +386,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
}
}
- val fetchResponse = if (leader0.get == configs(0).brokerId) {
+ val fetchResponse = if (leader0.get == configs.head.brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
@@ -423,13 +423,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
try {
// send message to partition 0
- val responses = ((0 until numRecords) map (i => producer.send(record, new CloseCallback(producer))))
+ val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
// flush the messages.
producer.flush()
assertTrue("All request are complete.", responses.forall(_.isDone()))
// Check the messages received by broker.
- val fetchResponse = if (leader.get == configs(0).brokerId) {
+ val fetchResponse = if (leader.get == configs.head.brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
@@ -446,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@Test
def testSendWithInvalidCreateTime() {
val topicProps = new Properties()
- topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000");
+ topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
val producer = createProducer(brokerList = brokerList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8424340..c76a216 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -79,7 +79,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.producers.foreach(_.close)
var consumed = 0L
- val consumer = this.consumers(0)
+ val consumer = this.consumers.head
consumer.subscribe(List(topic), new ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
@@ -124,7 +124,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
sendRecords(numRecords)
this.producers.foreach(_.close)
- val consumer = this.consumers(0)
+ val consumer = this.consumers.head
consumer.assign(List(tp))
consumer.seek(tp, 0)
@@ -174,7 +174,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
private def sendRecords(numRecords: Int) {
val futures = (0 until numRecords).map { i =>
- this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+ this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
}
futures.map(_.get)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 6e76f90..29d3bd6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
- servers(0).groupCoordinator.offsetsTopicConfigs)
+ servers.head.groupCoordinator.offsetsTopicConfigs)
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b22ccde..a5a6cd6 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -79,8 +79,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer0.close()
// now we should see the committed positions from another consumer
- assertEquals(300, this.consumers(0).committed(tp).offset)
- assertEquals(500, this.consumers(0).committed(tp2).offset)
+ assertEquals(300, this.consumers.head.committed(tp).offset)
+ assertEquals(500, this.consumers.head.committed(tp2).offset)
}
@Test
@@ -109,22 +109,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer0.close()
// now we should see the committed positions from another consumer
- assertEquals(300, this.consumers(0).committed(tp).offset)
- assertEquals(500, this.consumers(0).committed(tp2).offset)
+ assertEquals(300, this.consumers.head.committed(tp).offset)
+ assertEquals(500, this.consumers.head.committed(tp2).offset)
}
@Test
def testAutoOffsetReset() {
sendRecords(1)
- this.consumers(0).assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
+ this.consumers.head.assign(List(tp).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
}
@Test
def testGroupConsumption() {
sendRecords(10)
- this.consumers(0).subscribe(List(topic).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 1, startingOffset = 0)
+ this.consumers.head.subscribe(List(topic).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
}
@Test
@@ -147,11 +147,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(1000, new TopicPartition(topic3, 0))
sendRecords(1000, new TopicPartition(topic3, 1))
- assertEquals(0, this.consumers(0).assignment().size)
+ assertEquals(0, this.consumers.head.assignment().size)
val pattern = Pattern.compile("t.*c")
- this.consumers(0).subscribe(pattern, new TestConsumerReassignmentListener)
- this.consumers(0).poll(50)
+ this.consumers.head.subscribe(pattern, new TestConsumerReassignmentListener)
+ this.consumers.head.poll(50)
var subscriptions = Set(
new TopicPartition(topic, 0),
@@ -160,9 +160,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
new TopicPartition(topic1, 1))
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
val topic4 = "tsomec" // matches subscribed pattern
TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
@@ -175,12 +175,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
- this.consumers(0).unsubscribe()
- assertEquals(0, this.consumers(0).assignment().size)
+ this.consumers.head.unsubscribe()
+ assertEquals(0, this.consumers.head.assignment().size)
}
@Test
@@ -193,10 +193,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(1000, new TopicPartition(topic1, 0))
sendRecords(1000, new TopicPartition(topic1, 1))
- assertEquals(0, this.consumers(0).assignment().size)
+ assertEquals(0, this.consumers.head.assignment().size)
- this.consumers(0).subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
- this.consumers(0).poll(50)
+ this.consumers.head.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+ this.consumers.head.poll(50)
val subscriptions = Set(
new TopicPartition(topic, 0),
@@ -205,39 +205,39 @@ class PlaintextConsumerTest extends BaseConsumerTest {
new TopicPartition(topic1, 1))
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment() == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment() == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
- this.consumers(0).unsubscribe()
- assertEquals(0, this.consumers(0).assignment().size)
+ this.consumers.head.unsubscribe()
+ assertEquals(0, this.consumers.head.assignment().size)
}
@Test
def testCommitMetadata() {
- this.consumers(0).assign(List(tp).asJava)
+ this.consumers.head.assign(List(tp).asJava)
// sync commit
val syncMetadata = new OffsetAndMetadata(5, "foo")
- this.consumers(0).commitSync(Map((tp, syncMetadata)).asJava)
- assertEquals(syncMetadata, this.consumers(0).committed(tp))
+ this.consumers.head.commitSync(Map((tp, syncMetadata)).asJava)
+ assertEquals(syncMetadata, this.consumers.head.committed(tp))
// async commit
val asyncMetadata = new OffsetAndMetadata(10, "bar")
val callback = new CountConsumerCommitCallback
- this.consumers(0).commitAsync(Map((tp, asyncMetadata)).asJava, callback)
- awaitCommitCallback(this.consumers(0), callback)
- assertEquals(asyncMetadata, this.consumers(0).committed(tp))
+ this.consumers.head.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
+ awaitCommitCallback(this.consumers.head, callback)
+ assertEquals(asyncMetadata, this.consumers.head.committed(tp))
// handle null metadata
val nullMetadata = new OffsetAndMetadata(5, null)
- this.consumers(0).commitSync(Map((tp, nullMetadata)).asJava)
- assertEquals(nullMetadata, this.consumers(0).committed(tp))
+ this.consumers.head.commitSync(Map((tp, nullMetadata)).asJava)
+ assertEquals(nullMetadata, this.consumers.head.committed(tp))
}
@Test
def testAsyncCommit() {
- val consumer = this.consumers(0)
+ val consumer = this.consumers.head
consumer.assign(List(tp).asJava)
consumer.poll(0)
@@ -255,18 +255,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val otherTopic = "other"
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
- this.consumers(0).subscribe(List(topic).asJava)
+ this.consumers.head.subscribe(List(topic).asJava)
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
- this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+ this.consumers.head.subscribe(List(topic, otherTopic).asJava)
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == expandedSubscriptions.asJava
- }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment == expandedSubscriptions.asJava
+ }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
}
@Test
@@ -275,42 +275,42 @@ class PlaintextConsumerTest extends BaseConsumerTest {
TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
- this.consumers(0).subscribe(List(topic, otherTopic).asJava)
+ this.consumers.head.subscribe(List(topic, otherTopic).asJava)
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == subscriptions.asJava
- }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
- this.consumers(0).subscribe(List(topic).asJava)
+ this.consumers.head.subscribe(List(topic).asJava)
TestUtils.waitUntilTrue(() => {
- this.consumers(0).poll(50)
- this.consumers(0).assignment == shrunkenSubscriptions.asJava
- }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
+ this.consumers.head.poll(50)
+ this.consumers.head.assignment == shrunkenSubscriptions.asJava
+ }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
}
@Test
def testPartitionsFor() {
val numParts = 2
TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
- val parts = this.consumers(0).partitionsFor("part-test")
+ val parts = this.consumers.head.partitionsFor("part-test")
assertNotNull(parts)
assertEquals(2, parts.size)
}
@Test
def testPartitionsForAutoCreate() {
- val partitions = this.consumers(0).partitionsFor("non-exist-topic")
+ val partitions = this.consumers.head.partitionsFor("non-exist-topic")
assertFalse(partitions.isEmpty)
}
@Test(expected = classOf[InvalidTopicException])
def testPartitionsForInvalidTopic() {
- this.consumers(0).partitionsFor(";3# ads,{234")
+ this.consumers.head.partitionsFor(";3# ads,{234")
}
@Test
def testSeek() {
- val consumer = this.consumers(0)
+ val consumer = this.consumers.head
val totalRecords = 50L
val mid = totalRecords / 2
@@ -366,23 +366,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPositionAndCommit() {
sendRecords(5)
- assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
+ assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalArgumentException] {
- this.consumers(0).position(new TopicPartition(topic, 15))
+ this.consumers.head.position(new TopicPartition(topic, 15))
}
- this.consumers(0).assign(List(tp).asJava)
+ this.consumers.head.assign(List(tp).asJava)
- assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
- this.consumers(0).commitSync()
- assertEquals(0L, this.consumers(0).committed(tp).offset)
+ assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp))
+ this.consumers.head.commitSync()
+ assertEquals(0L, this.consumers.head.committed(tp).offset)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
- assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
- this.consumers(0).commitSync()
- assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
+ assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp))
+ this.consumers.head.commitSync()
+ assertEquals("Committed offset should be returned", 5L, this.consumers.head.committed(tp).offset)
sendRecords(1)
@@ -395,18 +395,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPartitionPauseAndResume() {
val partitions = List(tp).asJava
sendRecords(5)
- this.consumers(0).assign(partitions)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
- this.consumers(0).pause(partitions)
+ this.consumers.head.assign(partitions)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
+ this.consumers.head.pause(partitions)
sendRecords(5)
- assertTrue(this.consumers(0).poll(0).isEmpty)
- this.consumers(0).resume(partitions)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 5)
+ assertTrue(this.consumers.head.poll(0).isEmpty)
+ this.consumers.head.resume(partitions)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 5)
}
@Test
def testFetchInvalidOffset() {
- this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
@@ -441,7 +441,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// produce a record that is larger than the configured fetch size
val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
- this.producers(0).send(record)
+ this.producers.head.send(record)
// consuming a too-large record should fail
consumer0.assign(List(tp).asJava)
@@ -713,14 +713,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val numRecords = 50
// Test non-compressed messages
sendRecords(numRecords, tp)
- this.consumers(0).assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
+ this.consumers.head.assign(List(tp).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
startingTimestamp = 0)
// Test compressed messages
sendCompressedMessages(numRecords, tp2)
- this.consumers(0).assign(List(tp2).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+ this.consumers.head.assign(List(tp2).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
startingTimestamp = 0)
}
@@ -737,15 +737,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// Test non-compressed messages
val tp1 = new TopicPartition(topicName, 0)
sendRecords(numRecords, tp1)
- this.consumers(0).assign(List(tp1).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
+ this.consumers.head.assign(List(tp1).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
// Test compressed messages
val tp2 = new TopicPartition(topicName, 1)
sendCompressedMessages(numRecords, tp2)
- this.consumers(0).assign(List(tp2).asJava)
- consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+ this.consumers.head.assign(List(tp2).asJava)
+ consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index fc79c60..5814e94 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -32,8 +32,8 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne
@Test
def testMultipleBrokerMechanisms() {
- val plainSaslProducer = producers(0)
- val plainSaslConsumer = consumers(0)
+ val plainSaslProducer = producers.head
+ val plainSaslConsumer = consumers.head
val gssapiSaslProperties = kafkaSaslProperties("GSSAPI")
val gssapiSaslProducer = TestUtils.createNewProducer(brokerList,
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 2e288ec..6556100 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -311,9 +311,9 @@ object TestLogCleaning {
}
-case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
def this(line: String) = this(line.split("\t"))
- override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u")
+ override def toString = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u")
def topicAndKey = topic + key
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 5c2f1ae..9445191 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -255,7 +255,7 @@ object TestOffsetManager {
var statsThread: StatsThread = null
try {
zkUtils = ZkUtils(zookeeper, 6000, 2000, false)
- commitThreads = (0 to (threadCount-1)).map { threadId =>
+ commitThreads = (0 until threadCount).map { threadId =>
new CommitThread(threadId, partitionCount, commitIntervalMs, zkUtils)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ab8d363..763e4ec 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -134,7 +134,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
assertEquals(partitionDataForTopic2(2).partitionId, 2)
val replicas = partitionDataForTopic2(1).replicas
assertEquals(replicas.size, 2)
- assert(replicas(0).id == 0 || replicas(0).id == 1)
+ assert(replicas.head.id == 0 || replicas.head.id == 1)
assert(replicas(1).id == 0 || replicas(1).id == 1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 21bb6ab..7df1411 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -127,7 +127,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
- val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> zkUtils.getReplicasForPartition(topic, p))).toMap
+ val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for(i <- 0 until actualReplicaList.size)
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -174,9 +174,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+ Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
@@ -205,9 +205,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+ Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
@@ -236,9 +236,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
+ Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index bcfcfad..7c71aed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -63,6 +63,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts)
assertEquals(1, deletedProps.size)
- assertEquals("a", deletedProps(0))
+ assertEquals("a", deletedProps.head)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 477dcc8..1e1a98c 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -121,9 +121,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
+ Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
}, "Partition reassignment shouldn't complete.")
val controllerId = zkUtils.getController()
val controller = servers.filter(s => s.config.brokerId == controllerId).head
@@ -223,17 +223,17 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicAndPartition.topic
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
- brokerConfigs(0).setProperty("delete.topic.enable", "true")
- brokerConfigs(0).setProperty("log.cleaner.enable","true")
- brokerConfigs(0).setProperty("log.cleanup.policy","compact")
- brokerConfigs(0).setProperty("log.segment.bytes","100")
- brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000")
- brokerConfigs(0).setProperty("log.cleaner.dedupe.buffer.size","1048577")
+ brokerConfigs.head.setProperty("delete.topic.enable", "true")
+ brokerConfigs.head.setProperty("log.cleaner.enable","true")
+ brokerConfigs.head.setProperty("log.cleanup.policy","compact")
+ brokerConfigs.head.setProperty("log.segment.bytes","100")
+ brokerConfigs.head.setProperty("log.segment.delete.delay.ms","1000")
+ brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
val servers = createTestTopicAndCluster(topic,brokerConfigs)
// for simplicity, we are validating cleaner offsets on a single broker
- val server = servers(0)
+ val server = servers.head
val log = server.logManager.getLog(topicAndPartition).get
// write to the topic to activate cleaner
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ac7ce51..653b40c 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -68,7 +68,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
val messages = messageStrings.map(s => new Message(s.getBytes))
val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new LongRef(0), messages:_*)
- topicInfos(0).enqueue(messageSet)
+ topicInfos.head.enqueue(messageSet)
assertEquals(1, queue.size)
queue.put(ZookeeperConsumerConnector.shutdownCommand)
@@ -92,7 +92,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
val messages = messageStrings.map(s => new Message(s.getBytes))
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(0), messages:_*)
- topicInfos(0).enqueue(messageSet)
+ topicInfos.head.enqueue(messageSet)
assertEquals(1, queue.size)
val iter = new ConsumerIterator[String, String](queue,
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index a69fba1..b054794 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -430,7 +430,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
var beforeReleasingPartitionsCalled: Boolean = false
var beforeStartingFetchersCalled: Boolean = false
- var consumerId: String = "";
+ var consumerId: String = ""
var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null
var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 91ac1f6..699715b 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -65,7 +65,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
@Test
def testMetadataUpdate() {
log.setLevel(Level.INFO)
- var controller: KafkaServer = this.servers.head;
+ var controller: KafkaServer = this.servers.head
// Find the current controller
val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
for (server <- this.servers) {
@@ -121,7 +121,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
var counter = 0
while (!found && counter < 10) {
for (server <- this.servers) {
- val previousEpoch = (epochMap get server.config.brokerId) match {
+ val previousEpoch = epochMap get server.config.brokerId match {
case Some(epoch) =>
epoch
case None =>
@@ -130,7 +130,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
}
if (server.kafkaController.isActive
- && (previousEpoch) < server.kafkaController.epoch) {
+ && previousEpoch < server.kafkaController.epoch) {
controller = server
found = true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index beab1b5..dc343fa 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -802,7 +802,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val (error, groups) = groupCoordinator.handleListGroups()
assertEquals(Errors.NONE, error)
assertEquals(1, groups.size)
- assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ assertEquals(GroupOverview("groupId", "consumer"), groups.head)
}
@Test
@@ -814,7 +814,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val (error, groups) = groupCoordinator.handleListGroups()
assertEquals(Errors.NONE, error)
assertEquals(1, groups.size)
- assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ assertEquals(GroupOverview("groupId", "consumer"), groups.head)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 85e9cad..140f615 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -62,7 +62,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
val request = new FetchRequest(requestInfo = partitionRequests)
val fetched = consumer.fetch(request)
- assertTrue(!fetched.hasError && fetched.data.size == 0)
+ assertTrue(!fetched.hasError && fetched.data.isEmpty)
}
@Test
@@ -152,7 +152,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
- case e: OffsetOutOfRangeException => "this is good"
+ case e: OffsetOutOfRangeException => // This is good.
}
}
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
- case e: UnknownTopicOrPartitionException => "this is good"
+ case e: UnknownTopicOrPartitionException => // This is good.
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 2fdfc48..bdf116f 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -36,7 +36,7 @@ trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName)
- consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
+ consumer = new SimpleConsumer(host, servers.head.boundPort(), 1000000, 64 * 1024, "")
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 752a260..8212121 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -67,7 +67,7 @@ class CleanerTest extends JUnitSuite {
while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
val keysFound = keysInLog(log)
- assertEquals((0L until log.logEndOffset), keysFound)
+ assertEquals(0L until log.logEndOffset, keysFound)
// pretend we have the following keys
val keys = immutable.ListSet(1, 3, 5, 7, 9)
@@ -211,7 +211,7 @@ class CleanerTest extends JUnitSuite {
// grouping by very large values should result in a single group with all the segments in it
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(1, groups.size)
- assertEquals(log.numberOfSegments, groups(0).size)
+ assertEquals(log.numberOfSegments, groups.head.size)
checkSegmentOrder(groups)
// grouping by very small values should result in all groups having one entry
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 534443c..417aa75 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -145,7 +145,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
*/
@Test
def testTruncate() {
- val message = messageSet.toList(0)
+ val message = messageSet.toList.head
val end = messageSet.searchFor(1, 0).position
messageSet.truncateTo(end)
assertEquals(List(message), messageSet.toList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index cc9873c..a862cb1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -75,7 +75,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
cleaner.awaitCleaned("log", 0, firstDirty2)
val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
- assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2);
+ assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2)
val read2 = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f290d54..7b52a09 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -107,7 +107,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => "This is good."
+ case e: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -152,7 +152,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => "This is good."
+ case e: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f48f6b1..33dd68e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -617,10 +617,10 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
- assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList(0).index.maxEntries)
+ assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList.head.index.maxEntries)
log.truncateTo(0)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
- assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
+ assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 758dad2..8f66d62 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -383,7 +383,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
/* check that offsets are assigned based on byte offset from the given base offset */
def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
- assertTrue("Message set should not be empty", messages.size > 0)
+ assertTrue("Message set should not be empty", messages.nonEmpty)
var offset = baseOffset
for(entry <- messages) {
assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 3c8a41f..5c02125 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -28,12 +28,12 @@ import org.junit.{Before, Test}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
-case class MessageTestVal(val key: Array[Byte],
- val payload: Array[Byte],
- val codec: CompressionCodec,
- val timestamp: Long,
- val magicValue: Byte,
- val message: Message)
+case class MessageTestVal(key: Array[Byte],
+ payload: Array[Byte],
+ codec: CompressionCodec,
+ timestamp: Long,
+ magicValue: Byte,
+ message: Message)
class MessageTest extends JUnitSuite {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d215430..e60f350 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.network;
+package kafka.network
import java.net._
import javax.net.ssl._
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index cf25cdb..dc73db3 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -186,11 +186,11 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
}
assertEquals("Should have fetched 2 messages", 2, messageSet.size)
// Message 1
- assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet(0).message.payload))
- assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(0).message.key))
- assertTrue(messageSet(0).message.timestamp >= startTime && messageSet(0).message.timestamp < endTime)
- assertEquals(TimestampType.CREATE_TIME, messageSet(0).message.timestampType)
- assertEquals(Message.MagicValue_V1, messageSet(0).message.magic)
+ assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet.head.message.payload))
+ assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet.head.message.key))
+ assertTrue(messageSet.head.message.timestamp >= startTime && messageSet.head.message.timestamp < endTime)
+ assertEquals(TimestampType.CREATE_TIME, messageSet.head.message.timestampType)
+ assertEquals(Message.MagicValue_V1, messageSet.head.message.magic)
// Message 2
assertTrue(ByteBuffer.wrap("test2".getBytes).equals(messageSet(1).message.payload))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 8e234d2..270a794 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -99,7 +99,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)
assertTrue(response != null)
- assertTrue(!response.hasError && response.status.size == 0)
+ assertTrue(!response.hasError && response.status.isEmpty)
}
@Test
@@ -110,7 +110,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
TestUtils.createTopic(zkUtils, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
- val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
+ val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1))
@@ -118,7 +118,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
- val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
+ val safeSize = configs.head.messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
val message2 = new Message(new Array[Byte](safeSize))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
@@ -142,14 +142,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
// This message will be dropped silently since message size too large.
producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
// Send another message whose size is large enough to exceed the buffer size so
// the socket buffer will be flushed immediately;
// this send should fail since the socket has been closed
try {
producer.send(produceRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
} catch {
case e : java.io.IOException => // success
case e2: Throwable => throw e2
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index bbec5b1..9203130 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -80,7 +80,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
for (path <- zkUtils.persistentZkPaths) {
zkUtils.makeSurePersistentPathExists(path)
if(!path.equals(ZkUtils.ConsumersPath)) {
- val aclList = (zkUtils.zkConnection.getAcl(path)).getKey
+ val aclList = zkUtils.zkConnection.getAcl(path).getKey
assertTrue(aclList.size == 2)
for (acl: ACL <- aclList.asScala) {
assertTrue(isAclSecure(acl))
@@ -207,15 +207,15 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
for (path <- secondZk.securePersistentZkPaths) {
- val listParent = (secondZk.zkConnection.getAcl(path)).getKey
+ val listParent = secondZk.zkConnection.getAcl(path).getKey
assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
val childPath = path + "/fpjwashere"
- val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey
+ val listChild = secondZk.zkConnection.getAcl(childPath).getKey
assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
}
// Check consumers path.
- val consumersAcl = (firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath)).getKey
+ val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false))
}
@@ -223,7 +223,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* Verifies that the path has the appropriate secure ACL.
*/
private def verify(path: String): Boolean = {
- val list = (zkUtils.zkConnection.getAcl(path)).getKey
+ val list = zkUtils.zkConnection.getAcl(path).getKey
list.asScala.forall(isAclSecure)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index c5b61de..591fcf7 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -78,7 +78,7 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness {
val topicAndPart = TopicAndPartition(topic, partition)
val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
result = result && expectedOffset > 0 && brokers.forall { item =>
- (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
+ expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset
}
}
result
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index d1ad3a3..af979e4 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -37,7 +37,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def testConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
- this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topic))
+ this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic))
val oldVal: java.lang.Long = 100000L
val newVal: java.lang.Long = 200000L
val tp = TopicAndPartition("test", 0)
@@ -45,21 +45,21 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
- val logOpt = this.servers(0).logManager.getLog(tp)
+ val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldVal, logOpt.get.config.flushInterval)
}
logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
TestUtils.retry(10000) {
- assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
+ assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
}
}
@Test
def testClientQuotaConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
- this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client))
+ this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client))
val clientId = "testClient"
val props = new Properties()
props.put(ClientConfigOverride.ProducerOverride, "1000")
@@ -67,8 +67,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
TestUtils.retry(10000) {
- val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
- val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers
+ val configHandler = this.servers.head.dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
+ val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 26e2817..f5b515b 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -56,7 +56,7 @@ class HighwatermarkPersistenceTest {
val metrics = new Metrics
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler,
- logManagers(0), new AtomicBoolean(false))
+ logManagers.head, new AtomicBoolean(false))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@@ -64,7 +64,7 @@ class HighwatermarkPersistenceTest {
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0)
// create leader and follower replicas
- val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
+ val log0 = logManagers.head.createLog(TopicAndPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
partition0.addReplicaIfNotExists(leaderReplicaPartition0)
val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
@@ -99,7 +99,7 @@ class HighwatermarkPersistenceTest {
val metrics = new Metrics
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils,
- scheduler, logManagers(0), new AtomicBoolean(false))
+ scheduler, logManagers.head, new AtomicBoolean(false))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@@ -107,7 +107,7 @@ class HighwatermarkPersistenceTest {
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
// create leader log
- val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
+ val topic1Log0 = logManagers.head.createLog(TopicAndPartition(topic1, 0), LogConfig())
// create a local replica for topic1
val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -123,7 +123,7 @@ class HighwatermarkPersistenceTest {
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
// create leader log
- val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
+ val topic2Log0 = logManagers.head.createLog(TopicAndPartition(topic2, 0), LogConfig())
// create a local replica for topic2
val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
@@ -153,7 +153,7 @@ class HighwatermarkPersistenceTest {
}
def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
- replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
+ replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 89a8fd9..c34e4f0 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -126,7 +126,7 @@ class IsrExpirationTest {
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
- for(replica <- (partition0.assignedReplicas() - leaderReplica))
+ for(replica <- partition0.assignedReplicas() - leaderReplica)
replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, false))
// Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 7258980..3c30b6b 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -149,8 +149,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
staleControllerEpochCallback)
- TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true,
- "Controller epoch should be stale")
+ TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should be stale")
assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
} finally {
controllerChannelManager.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 463cd8a..0885709 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -132,7 +132,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- if(consumerOffsets(0) == 1) {
+ if(consumerOffsets.head == 1) {
offsetChanged = true
}
}
[3/3] kafka git commit: KAFKA-3771; Improving Kafka core code
Posted by ij...@apache.org.
KAFKA-3771; Improving Kafka core code
- Used flatMap instead of map and flatten
- Use isEmpty, NonEmpty, isDefined as appropriate
- Used head, keys and keySet where appropriate
- Used contains, diff and find where appropriate
- Removed redundant val modifier for case class constructor
- toString has no parameters, no side effect hence without () consistent usage
- Removed unnecessary return , parentheses and semi colons.
Author: Joshi <re...@gmail.com>
Author: Rekha Joshi <re...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1451 from rekhajoshm/KAFKA-3771
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79aaf19f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79aaf19f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79aaf19f
Branch: refs/heads/trunk
Commit: 79aaf19f24bb48f90404a3e3896d115107991f4c
Parents: 2c7fae0
Author: Rekha Joshi <re...@gmail.com>
Authored: Mon Jun 6 08:08:06 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 6 08:08:47 2016 +0100
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 2 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 2 +-
.../kafka/api/ControlledShutdownRequest.scala | 2 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 2 +-
.../kafka/api/GenericRequestAndHeader.scala | 2 +-
.../kafka/api/GenericResponseAndHeader.scala | 2 +-
.../src/main/scala/kafka/api/LeaderAndIsr.scala | 4 +-
.../main/scala/kafka/api/OffsetRequest.scala | 2 +-
.../main/scala/kafka/api/OffsetResponse.scala | 2 +-
.../main/scala/kafka/api/ProducerRequest.scala | 2 +-
.../scala/kafka/api/RequestOrResponse.scala | 2 +-
.../main/scala/kafka/api/TopicMetadata.scala | 4 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 2 +-
.../main/scala/kafka/client/ClientUtils.scala | 4 +-
core/src/main/scala/kafka/cluster/Cluster.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 14 +-
core/src/main/scala/kafka/cluster/Replica.scala | 4 +-
core/src/main/scala/kafka/common/AppInfo.scala | 2 +-
.../ZkNodeChangeNotificationListener.scala | 5 +-
.../main/scala/kafka/consumer/KafkaStream.scala | 2 +-
.../kafka/consumer/PartitionAssignor.scala | 2 +-
.../kafka/consumer/PartitionTopicInfo.scala | 2 +-
.../controller/ControllerChannelManager.scala | 12 +-
.../kafka/controller/KafkaController.scala | 70 ++++----
.../controller/PartitionLeaderSelector.scala | 2 +-
.../controller/PartitionStateMachine.scala | 12 +-
.../kafka/controller/ReplicaStateMachine.scala | 6 +-
.../kafka/controller/TopicDeletionManager.scala | 16 +-
.../kafka/coordinator/GroupCoordinator.scala | 2 +-
.../coordinator/GroupMetadataManager.scala | 2 +-
.../kafka/coordinator/MemberMetadata.scala | 2 +-
.../kafka/javaapi/TopicMetadataRequest.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
core/src/main/scala/kafka/message/Message.scala | 2 +-
.../kafka/metrics/KafkaMetricsReporter.scala | 8 +-
.../kafka/network/RequestOrResponseSend.scala | 2 +-
.../kafka/producer/BrokerPartitionInfo.scala | 2 +-
.../kafka/producer/ProducerRequestStats.scala | 2 +-
.../producer/async/DefaultEventHandler.scala | 14 +-
.../security/auth/SimpleAclAuthorizer.scala | 6 +-
.../scala/kafka/server/ClientQuotaManager.scala | 2 +-
.../scala/kafka/server/DelayedOperation.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../main/scala/kafka/server/KafkaServer.scala | 4 +-
.../scala/kafka/server/ReplicaManager.scala | 13 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 4 +-
.../scala/kafka/tools/ProducerPerformance.scala | 2 +-
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../kafka/tools/SimpleConsumerPerformance.scala | 4 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 10 +-
.../kafka/tools/StateChangeLogMerger.scala | 8 +-
.../kafka/tools/VerifyConsumerRebalance.scala | 2 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 4 +-
.../src/main/scala/kafka/utils/ToolsUtils.scala | 2 +-
.../kafka/utils/VerifiableProperties.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 8 +-
.../integration/kafka/api/AdminClientTest.scala | 22 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 10 +-
.../kafka/api/BaseConsumerTest.scala | 52 +++---
.../kafka/api/BaseProducerSendTest.scala | 14 +-
.../kafka/api/ConsumerBounceTest.scala | 6 +-
.../kafka/api/IntegrationTestHarness.scala | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 164 +++++++++----------
.../api/SaslMultiMechanismConsumerTest.scala | 4 +-
.../scala/kafka/tools/TestLogCleaning.scala | 4 +-
.../scala/other/kafka/TestOffsetManager.scala | 2 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 2 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 14 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 2 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 18 +-
.../kafka/consumer/ConsumerIteratorTest.scala | 4 +-
.../ZookeeperConsumerConnectorTest.scala | 2 +-
.../controller/ControllerFailoverTest.scala | 6 +-
.../GroupCoordinatorResponseTest.scala | 4 +-
.../kafka/integration/PrimitiveApiTest.scala | 6 +-
.../ProducerConsumerTestHarness.scala | 2 +-
.../test/scala/unit/kafka/log/CleanerTest.scala | 4 +-
.../unit/kafka/log/FileMessageSetTest.scala | 2 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 4 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 4 +-
.../message/ByteBufferMessageSetTest.scala | 2 +-
.../scala/unit/kafka/message/MessageTest.scala | 12 +-
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../unit/kafka/producer/ProducerTest.scala | 10 +-
.../unit/kafka/producer/SyncProducerTest.scala | 10 +-
.../security/auth/ZkAuthorizationTest.scala | 10 +-
.../kafka/server/BaseReplicaFetchTest.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 12 +-
.../server/HighwatermarkPersistenceTest.scala | 12 +-
.../unit/kafka/server/ISRExpirationTest.scala | 2 +-
.../unit/kafka/server/LeaderElectionTest.scala | 3 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 2 +-
.../unit/kafka/server/LogRecoveryTest.scala | 6 +-
.../server/ServerGenerateBrokerIdTest.scala | 4 +-
.../unit/kafka/server/SimpleFetchTest.scala | 10 +-
.../unit/kafka/tools/ConsoleProducerTest.scala | 4 +-
.../unit/kafka/utils/IteratorTemplateTest.scala | 2 +-
.../scala/unit/kafka/utils/JaasTestUtils.scala | 2 +-
.../scala/unit/kafka/utils/MockScheduler.scala | 4 +-
.../test/scala/unit/kafka/utils/MockTime.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 18 +-
104 files changed, 401 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index c643a9d..39bfe62 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -216,7 +216,7 @@ object TopicCommand extends Logging {
val leader = zkUtils.getLeaderForPartition(topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
- (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
+ (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
print("\tTopic: " + topic)
print("\tPartition: " + partitionId)
print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 2417d79..666d0e7 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -72,7 +72,7 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
override def compare(that: ApiVersion): Int =
ApiVersion.orderingByVersion.compare(this, that)
- override def toString(): String = version
+ override def toString: String = version
}
// Keep the IDs in order of versions
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 42a17e6..52c8828 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -63,7 +63,7 @@ case class ControlledShutdownRequest(versionId: Short,
4 /* broker id */
}
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 83e139a..f74bd1c 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -141,7 +141,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
def numPartitions = requestInfo.size
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
index b0c6d7a..cb5b95e 100644
--- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -39,7 +39,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
body.sizeOf()
}
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
index 748b5e9..2835fb6 100644
--- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -32,7 +32,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
body.sizeOf()
}
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 5de527c..e5813a5 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -35,7 +35,7 @@ object LeaderAndIsr {
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
- override def toString(): String = {
+ override def toString: String = {
Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
}
}
@@ -83,7 +83,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControlle
size
}
- override def toString(): String = {
+ override def toString: String = {
val partitionStateInfo = new StringBuilder
partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 59181d1..b15cf5a 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -109,7 +109,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index bfb270f..b767c08 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -46,7 +46,7 @@ object OffsetResponse {
case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
- override def toString(): String = {
+ override def toString: String = {
new String("error: " + Errors.forCode(error).exceptionName + " offsets: " + offsets.mkString)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 30af841..aad2fa5 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -124,7 +124,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
def numPartitions = data.size
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 73ec1d9..65b37fd 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -26,7 +26,7 @@ object Request {
val DebuggingConsumerId: Int = -2
// Broker ids are non-negative int.
- def isValidBrokerId(brokerId: Int): Boolean = (brokerId >= 0)
+ def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index ae5ea58..815de21 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -57,7 +57,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
partitionsMetadata.foreach(m => m.writeTo(buffer))
}
- override def toString(): String = {
+ override def toString: String = {
val topicMetadataInfo = new StringBuilder
topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
Errors.forCode(errorCode) match {
@@ -138,7 +138,7 @@ case class PartitionMetadata(partitionId: Int,
isr.foreach(r => buffer.putInt(r.id))
}
- override def toString(): String = {
+ override def toString: String = {
val partitionMetadataString = new StringBuilder
partitionMetadataString.append("\tpartition " + partitionId)
partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 0654e3d..107696d 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -57,7 +57,7 @@ case class TopicMetadataRequest(versionId: Short,
topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fd1fc26..f61a978 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -142,11 +142,11 @@ object ClientUtils extends Logging{
var offsetManagerChannelOpt: Option[BlockingChannel] = None
- while (!offsetManagerChannelOpt.isDefined) {
+ while (offsetManagerChannelOpt.isEmpty) {
var coordinatorOpt: Option[BrokerEndPoint] = None
- while (!coordinatorOpt.isDefined) {
+ while (coordinatorOpt.isEmpty) {
try {
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkUtils)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Cluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Cluster.scala b/core/src/main/scala/kafka/cluster/Cluster.scala
index 992c54e..75bbec0 100644
--- a/core/src/main/scala/kafka/cluster/Cluster.scala
+++ b/core/src/main/scala/kafka/cluster/Cluster.scala
@@ -40,6 +40,6 @@ private[kafka] class Cluster {
def size = brokers.size
- override def toString(): String =
+ override def toString: String =
"Cluster(" + brokers.values.mkString(", ") + ")"
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ea22e87..a561a97 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -64,7 +64,7 @@ class Partition(val topic: String,
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
- private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
+ private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId
val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
newGauge("UnderReplicated",
@@ -158,7 +158,7 @@ class Partition(val topic: String,
}
def getLeaderEpoch(): Int = {
- return this.leaderEpoch
+ this.leaderEpoch
}
/**
@@ -381,9 +381,9 @@ class Partition(val topic: String,
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
- if(outOfSyncReplicas.size > 0) {
+ if(outOfSyncReplicas.nonEmpty) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
- assert(newInSyncReplicas.size > 0)
+ assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
@@ -421,7 +421,7 @@ class Partition(val topic: String,
val candidateReplicas = inSyncReplicas - leaderReplica
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
- if(laggingReplicas.size > 0)
+ if(laggingReplicas.nonEmpty)
debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
@@ -484,7 +484,7 @@ class Partition(val topic: String,
}
override def equals(that: Any): Boolean = {
- if(!(that.isInstanceOf[Partition]))
+ if(!that.isInstanceOf[Partition])
return false
val other = that.asInstanceOf[Partition]
if(topic.equals(other.topic) && partitionId == other.partitionId)
@@ -496,7 +496,7 @@ class Partition(val topic: String,
31 + topic.hashCode() + 17*partitionId
}
- override def toString(): String = {
+ override def toString: String = {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 740e835..dfb203a 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -98,7 +98,7 @@ class Replica(val brokerId: Int,
}
override def equals(that: Any): Boolean = {
- if(!(that.isInstanceOf[Replica]))
+ if(!that.isInstanceOf[Replica])
return false
val other = that.asInstanceOf[Replica]
if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
@@ -111,7 +111,7 @@ class Replica(val brokerId: Int,
}
- override def toString(): String = {
+ override def toString: String = {
val replicaString = new StringBuilder
replicaString.append("ReplicaId: " + brokerId)
replicaString.append("; Topic: " + topic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/common/AppInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala
index 8e2f49d..f77bdf5 100644
--- a/core/src/main/scala/kafka/common/AppInfo.scala
+++ b/core/src/main/scala/kafka/common/AppInfo.scala
@@ -42,7 +42,7 @@ object AppInfo extends KafkaMetricsGroup {
newGauge("CommitID",
new Gauge[String] {
def value = {
- AppInfoParser.getCommitId();
+ AppInfoParser.getCommitId()
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index baddecc..580ae33 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -92,7 +92,9 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
- data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification"))
+ data.map(notificationHandler.processNotification(_)).getOrElse {
+ logger.warn(s"read null data from $changeZnode when processing notification $notification")
+ }
}
lastExecutedChange = changeId
}
@@ -107,6 +109,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
/**
* Purges expired notifications.
+ *
* @param now
* @param notifications
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 805e916..aebf3ea 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -45,7 +45,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
iter.clearCurrentChunk()
}
- override def toString(): String = {
+ override def toString: String = {
"%s kafka stream".format(clientId)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 5a1bdd0..96fe690 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -76,7 +76,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
- if (ctx.consumersForTopic.size > 0) {
+ if (ctx.consumersForTopic.nonEmpty) {
// check conditions (a) and (b)
val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
ctx.consumersForTopic.foreach { case (topic, threadIds) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 9c779ce..c7c7836 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -67,7 +67,7 @@ class PartitionTopicInfo(val topic: String,
}
}
- override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
+ override def toString: String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b4059a4..32478ca 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -257,13 +257,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
def newBatch() {
// raise error if the previous batch is not empty
- if (leaderAndIsrRequestMap.size > 0)
+ if (leaderAndIsrRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
- if (stopReplicaRequestMap.size > 0)
+ if (stopReplicaRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
- if (updateMetadataRequestMap.size > 0)
+ if (updateMetadataRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}
@@ -424,15 +424,15 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
- if (leaderAndIsrRequestMap.size > 0) {
+ if (leaderAndIsrRequestMap.nonEmpty) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
- if (updateMetadataRequestMap.size > 0) {
+ if (updateMetadataRequestMap.nonEmpty) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap. Exception message: $e")
}
- if (stopReplicaRequestMap.size > 0) {
+ if (stopReplicaRequestMap.nonEmpty) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap. Exception message: $e")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d533a85..1584cc9 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -71,7 +71,7 @@ class ControllerContext(val zkUtils: ZkUtils,
// getter
def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
- def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
+ def liveBrokerIds = liveBrokerIdsUnderlying -- shuttingDownBrokerIds
def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
def liveOrShuttingDownBrokers = liveBrokersUnderlying
@@ -84,22 +84,23 @@ class ControllerContext(val zkUtils: ZkUtils,
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
- brokerIds.map { brokerId =>
+ brokerIds.flatMap { brokerId =>
partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case(topicAndPartition, replicas) =>
- new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
- }.flatten.toSet
+ .filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }
+ .map { case (topicAndPartition, replicas) =>
+ new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
+ }
+ }.toSet
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
- .map { case(topicAndPartition, replicas) =>
+ .filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+ .flatMap { case (topicAndPartition, replicas) =>
replicas.map { r =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
}
- }.flatten.toSet
+ }.toSet
}
def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
@@ -112,10 +113,10 @@ class ControllerContext(val zkUtils: ZkUtils,
}
def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
- partitions.map { p =>
+ partitions.flatMap { p =>
val replicas = partitionReplicaAssignment(p)
replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
- }.flatten
+ }
}
def removeTopic(topic: String) = {
@@ -139,7 +140,7 @@ object KafkaController extends Logging {
Json.parseFull(controllerInfoString) match {
case Some(m) =>
val controllerInfo = m.asInstanceOf[Map[String, Any]]
- return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ controllerInfo.get("brokerid").get.asInstanceOf[Int]
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
@@ -148,7 +149,7 @@ object KafkaController extends Logging {
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try {
- return controllerInfoString.toInt
+ controllerInfoString.toInt
} catch {
case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
}
@@ -298,7 +299,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.partitionLeadershipInfo.filter {
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
- }.map(_._1)
+ }.keys
}
replicatedPartitionsBrokerLeads().toSet
}
@@ -439,7 +440,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
// on the newly restarted brokers, there is a chance that topic deletion can resume
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
- if(replicasForTopicsToBeDeleted.size > 0) {
+ if(replicasForTopicsToBeDeleted.nonEmpty) {
info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
"Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
@@ -479,7 +480,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
// check if topic deletion state for the dead replicas needs to be updated
val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
- if(replicasForTopicsToBeDeleted.size > 0) {
+ if(replicasForTopicsToBeDeleted.nonEmpty) {
// it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
// deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
// since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
@@ -780,9 +781,9 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
val topicDeleted = replicasOpt.isEmpty
- val successful = if(!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
+ val successful = if (!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
topicDeleted || successful
- }.map(_._1)
+ }.keys
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
partitionsToReassign ++= partitionsBeingReassigned
@@ -992,7 +993,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
try {
val zkPath = getTopicPath(topicAndPartition.topic)
- val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+ val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => e._1.partition.toString -> e._2))
zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
@@ -1021,6 +1022,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
/**
* Send the leader information for selected partitions to selected brokers so that they can correctly respond to
* metadata requests
+ *
* @param brokers The brokers that the update metadata request should be sent to
*/
def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
@@ -1043,6 +1045,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
/**
* Removes a given partition replica from the ISR; if it is not the current
* leader and there are sufficient remaining replicas in ISR.
+ *
* @param topic topic
* @param partition partition
* @param replicaId replica Id
@@ -1109,6 +1112,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
/**
* Does not change leader or isr, but just increments the leader epoch
+ *
* @param topic topic
* @param partition partition
* @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
@@ -1162,8 +1166,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
- * @throws Exception
- * On any error.
+ * @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
@@ -1219,8 +1222,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.size == 0 &&
- controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+ controllerContext.partitionsBeingReassigned.isEmpty &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
onPreferredReplicaElection(Set(topicPartition), true)
@@ -1250,6 +1253,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
/**
* Invoked when some partitions are reassigned by the admin command
+ *
* @throws Exception On any error.
*/
@throws(classOf[Exception])
@@ -1276,8 +1280,8 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
/**
* Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
- * @throws Exception
- * On any error.
+ *
+ * @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
@@ -1293,6 +1297,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
/**
* Invoked when some partitions need to move leader to preferred replica
+ *
* @throws Exception On any error.
*/
@throws(classOf[Exception])
@@ -1343,6 +1348,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
/**
* Called when leader intimates of isr change
+ *
* @param controller
*/
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
@@ -1354,7 +1360,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
debug("[IsrChangeNotificationListener] Fired!!!")
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
try {
- val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+ val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet
if (topicAndPartitions.nonEmpty) {
controller.updateLeaderAndIsrCache(topicAndPartitions)
processUpdateNotifications(topicAndPartitions)
@@ -1417,6 +1423,7 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
/**
* Invoked when some partitions are reassigned by the admin command
+ *
* @throws Exception On any error.
*/
@throws(classOf[Exception])
@@ -1425,12 +1432,12 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
.format(dataPath, data.toString))
inLock(controllerContext.controllerLock) {
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
- if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
+ if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
info("These partitions are already undergoing preferred replica election: %s"
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
- if(partitionsForTopicsToBeDeleted.size > 0) {
+ if(partitionsForTopicsToBeDeleted.nonEmpty) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
.format(partitionsForTopicsToBeDeleted))
}
@@ -1439,8 +1446,7 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
}
/**
- * @throws Exception
- * On any error.
+ * @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
@@ -1451,13 +1457,13 @@ case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
- override def toString(): String = {
+ override def toString: String = {
"[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
}
}
case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
- override def toString(): String = {
+ override def toString: String = {
val leaderAndIsrInfo = new StringBuilder
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 682ce1d..9517523 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -183,7 +183,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
- liveAssignedReplicas.filter(newIsr.contains).headOption match {
+ liveAssignedReplicas.find(newIsr.contains) match {
case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 47efc51..bf5fde4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -115,7 +115,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
for((topicAndPartition, partitionState) <- partitionState
- if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
+ if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
@@ -432,7 +432,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
- if(newTopics.size > 0)
+ if(newTopics.nonEmpty)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
@@ -463,13 +463,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
- val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
- if(nonExistentTopics.size > 0) {
+ val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+ if(nonExistentTopics.nonEmpty) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
- if(topicsToBeDeleted.size > 0) {
+ if(topicsToBeDeleted.nonEmpty) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
@@ -512,7 +512,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
- if (partitionsToBeAdded.size > 0) {
+ if (partitionsToBeAdded.nonEmpty) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d49b6af..d4e9bb4 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -107,7 +107,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
*/
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
- if(replicas.size > 0) {
+ if(replicas.nonEmpty) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
@@ -370,9 +370,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
.format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
- if(newBrokerIds.size > 0)
+ if(newBrokerIds.nonEmpty)
controller.onBrokerStartup(newBrokerIdsSorted)
- if(deadBrokerIds.size > 0)
+ if(deadBrokerIds.nonEmpty)
controller.onBrokerFailure(deadBrokerIdsSorted)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index c6f80ac..f24c69c 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -95,7 +95,7 @@ class TopicDeletionManager(controller: KafkaController,
def start() {
if (isDeleteTopicEnabled) {
deleteTopicsThread = new DeleteTopicsThread()
- if (topicsToBeDeleted.size > 0)
+ if (topicsToBeDeleted.nonEmpty)
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
}
@@ -142,7 +142,7 @@ class TopicDeletionManager(controller: KafkaController,
def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
if(isDeleteTopicEnabled) {
val topicsToResumeDeletion = topics & topicsToBeDeleted
- if(topicsToResumeDeletion.size > 0) {
+ if(topicsToResumeDeletion.nonEmpty) {
topicsIneligibleForDeletion --= topicsToResumeDeletion
resumeTopicDeletionThread()
}
@@ -160,7 +160,7 @@ class TopicDeletionManager(controller: KafkaController,
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
if(isDeleteTopicEnabled) {
val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
- if(replicasThatFailedToDelete.size > 0) {
+ if(replicasThatFailedToDelete.nonEmpty) {
val topics = replicasThatFailedToDelete.map(_.topic)
debug("Deletion failed for replicas %s. Halting deletion for topics %s"
.format(replicasThatFailedToDelete.mkString(","), topics))
@@ -182,7 +182,7 @@ class TopicDeletionManager(controller: KafkaController,
if(isDeleteTopicEnabled) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
- if(newTopicsToHaltDeletion.size > 0)
+ if(newTopicsToHaltDeletion.nonEmpty)
info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
}
}
@@ -310,7 +310,7 @@ class TopicDeletionManager(controller: KafkaController,
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
- onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
+ onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
}
}
@@ -343,7 +343,7 @@ class TopicDeletionManager(controller: KafkaController,
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
- if(deadReplicasForTopic.size > 0) {
+ if(deadReplicasForTopic.nonEmpty) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
}
@@ -373,7 +373,7 @@ class TopicDeletionManager(controller: KafkaController,
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
- else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
+ else responseMap.filter { case (_, error) => error != Errors.NONE.code }.keySet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
inLock(controllerContext.controllerLock) {
// move all the failed replicas to ReplicaDeletionIneligible
@@ -397,7 +397,7 @@ class TopicDeletionManager(controller: KafkaController,
inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
- if(!topicsQueuedForDeletion.isEmpty)
+ if(topicsQueuedForDeletion.nonEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
topicsQueuedForDeletion.foreach { topic =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index f445764..e9bbbd3 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -647,7 +647,7 @@ class GroupCoordinator(val brokerId: Int,
def onCompleteJoin(group: GroupMetadata) {
group synchronized {
val failedMembers = group.notYetRejoinedMembers
- if (group.isEmpty || !failedMembers.isEmpty) {
+ if (group.isEmpty || failedMembers.nonEmpty) {
failedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index c6bc44e..b968f97 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -108,7 +108,7 @@ class GroupMetadataManager(val brokerId: Int,
def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
- def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty
+ def isLoading(): Boolean = loadingPartitions synchronized loadingPartitions.nonEmpty
/**
* Get the group associated with the given groupId, or null if not found
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 1d799f2..c57b990 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -90,7 +90,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
return false
}
- return true
+ true
}
def summary(protocol: String): MemberSummary = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 92d9073..f625ba0 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -44,7 +44,7 @@ class TopicMetadataRequest(val versionId: Short,
def sizeInBytes: Int = underlying.sizeInBytes()
- override def toString(): String = {
+ override def toString: String = {
describe(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 62dc7a1..76cd86e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -802,7 +802,7 @@ class Log(val dir: File,
}
}
- override def toString() = "Log(" + dir + ")"
+ override def toString = "Log(" + dir + ")"
/**
* This method performs an asynchronous log segment delete by doing the following:
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 37f7579..6bbc50c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -203,7 +203,7 @@ class LogSegment(val log: FileMessageSet,
truncated
}
- override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
+ override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
/**
* Truncate off all index and log entries with offsets >= the given offset.
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 2ab2e0c..bb91078 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -402,7 +402,7 @@ class Message(val buffer: ByteBuffer,
throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
}
- override def toString(): String = {
+ override def toString: String = {
if (magic == MagicValue_V0)
s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key = $key, payload = $payload)"
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 0d6da34..999b2a4 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -54,12 +54,14 @@ object KafkaMetricsReporter {
ReporterStarted synchronized {
if (!ReporterStarted.get()) {
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
- if(metricsConfig.reporters.size > 0) {
+ if(metricsConfig.reporters.nonEmpty) {
metricsConfig.reporters.foreach(reporterType => {
val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType)
reporter.init(verifiableProps)
- if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
- CoreUtils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+ reporter match {
+ case bean: KafkaMetricsReporterMBean => CoreUtils.registerMBean(reporter, bean.getMBeanName)
+ case _ =>
+ }
})
ReporterStarted.set(true)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
index 364f24b..153d636 100644
--- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.NetworkSend
object RequestOrResponseSend {
def serialize(request: RequestOrResponse): ByteBuffer = {
- val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId != None) 2 else 0))
+ val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId.isDefined) 2 else 0))
request.requestId match {
case Some(requestId) =>
buffer.putShort(requestId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 4616c7e..97289a1 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -55,7 +55,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
}
}
val partitionMetadata = metadata.partitionsMetadata
- if(partitionMetadata.size == 0) {
+ if(partitionMetadata.isEmpty) {
if(metadata.errorCode != Errors.NONE.code) {
throw new KafkaException(Errors.forCode(metadata.errorCode).exception)
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 8ab948a..92bbbcf 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -30,7 +30,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
- val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags);
+ val throttleTimeStats = newTimer("ProducerRequestThrottleRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index b79e64b..f9591ad 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -66,7 +66,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
debug("Handling %d events".format(events.size))
- while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+ while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
@@ -76,7 +76,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
- if (outstandingProduceRequests.size > 0) {
+ if (outstandingProduceRequests.nonEmpty) {
info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.retryBackoffMs)
@@ -87,7 +87,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
producerStats.resendRate.mark()
}
}
- if(outstandingProduceRequests.size > 0) {
+ if(outstandingProduceRequests.nonEmpty) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
@@ -261,9 +261,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
*/
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
- warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
+ warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.keys.mkString(",")))
messagesPerTopic.keys.toSeq
- } else if(messagesPerTopic.size > 0) {
+ } else if(messagesPerTopic.nonEmpty) {
val currentCorrelationId = correlationId.getAndIncrement
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
@@ -285,7 +285,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
- if(failedTopicPartitions.size > 0) {
+ if(failedTopicPartitions.nonEmpty) {
val errorString = failedPartitionsAndStatus
.sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
(p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
@@ -302,7 +302,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
} catch {
case t: Throwable =>
warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
- .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
+ .format(currentCorrelationId, brokerId, messagesPerTopic.keys.mkString(",")), t)
messagesPerTopic.keys.toSeq
}
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 18fff45..a36a07d 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -154,7 +154,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
- if (superUsers.exists( _ == principal)) {
+ if (superUsers.contains(principal)) {
authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")
true
} else false
@@ -275,7 +275,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val newAcls = getNewAcls(currentVersionedAcls.acls)
val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
val (updateSucceeded, updateVersion) =
- if (!newAcls.isEmpty) {
+ if (newAcls.nonEmpty) {
updatePath(path, data, currentVersionedAcls.zkVersion)
} else {
trace(s"Deleting path for $resource because it had no ACLs remaining")
@@ -285,7 +285,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
if (!updateSucceeded) {
trace(s"Failed to update ACLs for $resource. Used version ${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
Thread.sleep(backoffTime)
- currentVersionedAcls = getAclsFromZk(resource);
+ currentVersionedAcls = getAclsFromZk(resource)
retries += 1
} else {
newVersionedAcls = VersionedAcls(newAcls, updateVersion)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5863c72..c99ba97 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -157,7 +157,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Returns the quota for the specified clientId
*/
def quota(clientId: String): Quota =
- if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota;
+ if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota
/*
* This function either returns the sensors for a given client id or creates them if they don't exist
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2205568..5248edf 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -175,7 +175,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
* @return true iff the delayed operations can be completed by the caller
*/
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
- assert(watchKeys.size > 0, "The watch key list can't be empty")
+ assert(watchKeys.nonEmpty, "The watch key list can't be empty")
// The cost of tryComplete() is typically proportional to the number of keys. Calling
// tryComplete() for each key is going to be expensive if there are many keys. Instead,
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1edc162..ebd1732 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -293,7 +293,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedRequestInfo.mapValues { partitionData =>
- val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata;
+ val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp,
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a664484..ca66f9d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1002,7 +1002,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
- require(logDirs.size > 0)
+ require(logDirs.nonEmpty)
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f95d9ef..994e28e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -475,7 +475,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
response = channel.receive()
val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null &&
- shutdownResponse.partitionsRemaining.size == 0) {
+ shutdownResponse.partitionsRemaining.isEmpty) {
shutdownSucceeded = true
info ("Controlled shutdown succeeded")
}
@@ -649,7 +649,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last} in meta.properties. " +
s"If you moved your data, make sure your configured broker.id matches. " +
s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
- else if(brokerIdSet.size == 0 && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
+ else if(brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
brokerId = generateBrokerId
else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
brokerId = brokerIdSet.last
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68f2385..447fb40 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -359,9 +359,8 @@ class ReplicaManager(val config: KafkaConfig,
// Just return an error and don't handle the request at all
val responseStatus = messagesPerPartition.map {
case (topicAndPartition, messageSet) =>
- (topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
- LogAppendInfo.UnknownLogAppendInfo.firstOffset,
- Message.NoTimestamp))
+ topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+ LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
}
responseCallback(responseStatus)
}
@@ -375,7 +374,7 @@ class ReplicaManager(val config: KafkaConfig,
private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicPartition, MessageSet],
localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
- messagesPerPartition.size > 0 &&
+ messagesPerPartition.nonEmpty &&
localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size
}
@@ -639,13 +638,13 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
stateInfo.leader == config.brokerId
}
- val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
+ val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
- val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
+ val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
- val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
+ val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 6480ff5..8e5dcc8 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -91,7 +91,7 @@ object ConsumerPerformance {
val elapsedSecs = (endMs - startMs) / 1000.0
if (!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
- println(("%s, %s, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+ println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
}
}
@@ -156,7 +156,7 @@ object ConsumerPerformance {
val elapsedMs: Double = endMs - startMs
val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
- println(("%s, %d, %.4f, %.4f, %d, %.4f").format(dateFormat.format(endMs), id, totalMBRead,
+ println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead,
1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index cf2000b..d4c0f34 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -63,7 +63,7 @@ object ProducerPerformance extends Logging {
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs) / 1000.0
val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
- println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
+ println("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f".format(
config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71bf0c0..9a059df 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -180,7 +180,7 @@ object ReplicaVerificationTool extends Logging {
fetchSize = fetchSize,
maxWait = maxWaitMs,
minBytes = 1,
- doVerification = (brokerId == verificationBrokerId))
+ doVerification = brokerId == verificationBrokerId)
}
Runtime.getRuntime.addShutdownHook(new Thread() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 5e3c605..3abbc40 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -92,7 +92,7 @@ object SimpleConsumerPerformance {
val reportTime = System.currentTimeMillis
val elapsed = (reportTime - lastReportTime)/1000.0
val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024)
- println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize,
+ println("%s, %d, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(reportTime), config.fetchSize,
(totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
}
@@ -107,7 +107,7 @@ object SimpleConsumerPerformance {
if(!config.showDetailedStats) {
val totalMBRead = (totalBytesRead*1.0)/(1024*1024)
- println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
+ println("%s, %s, %d, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs),
config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
totalMessagesRead, totalMessagesRead/elapsed))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 6ad68b6..c975d24 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -131,15 +131,15 @@ object SimpleConsumerShell extends Logging {
ToolsUtils.validatePortOrDie(parser,brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
- if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+ if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
}
// validating partition id
- val partitionsMetadata = topicsMetadata(0).partitionsMetadata
+ val partitionsMetadata = topicsMetadata.head.partitionsMetadata
val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
- if (!partitionMetadataOpt.isDefined) {
+ if (partitionMetadataOpt.isEmpty) {
System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
System.exit(1)
}
@@ -149,7 +149,7 @@ object SimpleConsumerShell extends Logging {
var replicaOpt: Option[BrokerEndPoint] = null
if (replicaId == UseLeaderReplica) {
replicaOpt = partitionMetadataOpt.get.leader
- if (!replicaOpt.isDefined) {
+ if (replicaOpt.isEmpty) {
System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
System.exit(1)
}
@@ -157,7 +157,7 @@ object SimpleConsumerShell extends Logging {
else {
val replicasForPartition = partitionMetadataOpt.get.replicas
replicaOpt = replicasForPartition.find(r => r.id == replicaId)
- if(!replicaOpt.isDefined) {
+ if(replicaOpt.isEmpty) {
System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
System.exit(1)
}