You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/29 08:01:47 UTC
kafka git commit: KAFKA-3765; Kafka Code style corrections
Repository: kafka
Updated Branches:
refs/heads/trunk 0aff45096 -> 404b696be
KAFKA-3765; Kafka Code style corrections
Removed explicit returns, not needed parentheses, corrected variables, removed unused imports
Using isEmpty/nonEmpty instead of size check, using head, flatmap instead of map-flatten
Author: Joshi <re...@gmail.com>
Author: Rekha Joshi <re...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1442 from rekhajoshm/KAFKA-3765
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/404b696b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/404b696b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/404b696b
Branch: refs/heads/trunk
Commit: 404b696bea58aca17fbe528aed03cb3c94516c39
Parents: 0aff450
Author: Joshi <re...@gmail.com>
Authored: Sun May 29 09:01:20 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sun May 29 09:01:20 2016 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/admin/AclCommand.scala | 2 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 14 +++++++-------
core/src/main/scala/kafka/admin/AdminUtils.scala | 12 ++++++------
.../scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +-
core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++--
.../main/scala/kafka/admin/ZkSecurityMigrator.scala | 14 ++++++--------
.../scala/kafka/api/ControlledShutdownRequest.scala | 2 +-
core/src/main/scala/kafka/log/FileMessageSet.scala | 4 ++--
core/src/main/scala/kafka/log/Log.scala | 4 ++--
core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++--
core/src/main/scala/kafka/log/LogCleanerManager.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 5 ++---
core/src/main/scala/kafka/log/OffsetIndex.scala | 8 ++++++--
core/src/main/scala/kafka/tools/ConsoleProducer.scala | 2 +-
core/src/main/scala/kafka/tools/GetOffsetShell.scala | 2 +-
core/src/main/scala/kafka/tools/JmxTool.scala | 6 +++---
core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +-
17 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 966c4be..080f809 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -117,7 +117,7 @@ object AclCommand {
val resourceToAcls: Iterable[(Resource, Set[Acl])] =
if (resources.isEmpty) authorizer.getAcls()
- else resources.map(resource => (resource -> authorizer.getAcls(resource)))
+ else resources.map(resource => resource -> authorizer.getAcls(resource))
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 8572ceb..556a02b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -49,7 +49,7 @@ class AdminClient(val time: Time,
client.poll(future)
if (future.succeeded())
- return future.value().responseBody()
+ future.value().responseBody()
else
throw future.exception()
}
@@ -61,10 +61,10 @@ class AdminClient(val time: Time,
return send(broker, api, request)
} catch {
case e: Exception =>
- debug(s"Request ${api} failed against node ${broker}", e)
+ debug(s"Request $api failed against node $broker", e)
}
}
- throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+ throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
}
private def findCoordinator(groupId: String): Node = {
@@ -88,7 +88,7 @@ class AdminClient(val time: Time,
val response = new MetadataResponse(responseBody)
val errors = response.errors()
if (!errors.isEmpty)
- debug(s"Metadata request contained errors: ${errors}")
+ debug(s"Metadata request contained errors: $errors")
response.cluster().nodes().asScala.toList
}
@@ -100,7 +100,7 @@ class AdminClient(val time: Time,
listGroups(broker)
} catch {
case e: Exception =>
- debug(s"Failed to find groups from broker ${broker}", e)
+ debug(s"Failed to find groups from broker $broker", e)
List[GroupOverview]()
}
}
@@ -127,7 +127,7 @@ class AdminClient(val time: Time,
val response = new DescribeGroupsResponse(responseBody)
val metadata = response.groups().get(groupId)
if (metadata == null)
- throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+ throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
Errors.forCode(metadata.errorCode()).maybeThrow()
val members = metadata.members().map { member =>
@@ -149,7 +149,7 @@ class AdminClient(val time: Time,
return None
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
- throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
+ throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group")
if (group.state == "Stable") {
Some(group.members.map { member =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a8a282e..53b6dd7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -250,7 +250,7 @@ object AdminUtils extends Logging {
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
- if (existingPartitionsReplicaList.size == 0)
+ if (existingPartitionsReplicaList.isEmpty)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
@@ -274,8 +274,8 @@ object AdminUtils extends Logging {
existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
- val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
- if (unmatchedRepFactorList.size != 0)
+ val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size)
+ if (unmatchedRepFactorList.nonEmpty)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
@@ -291,9 +291,9 @@ object AdminUtils extends Logging {
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
- for (i <- 0 until partitionList.size) {
+ for (i <- partitionList.indices) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
- if (brokerList.size <= 0)
+ if (brokerList.isEmpty)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
@@ -443,7 +443,7 @@ object AdminUtils extends Logging {
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
- val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
+ val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 1bf351a..fae0a40 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging {
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
- (TopicAndPartition(topic, partition) -> replicas)
+ TopicAndPartition(topic, partition) -> replicas
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e6ebb96..c643a9d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -118,7 +118,7 @@ object TopicCommand extends Logging {
def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
- if (topics.length == 0 && !ifExists) {
+ if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
@@ -165,7 +165,7 @@ object TopicCommand extends Logging {
def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
- if (topics.length == 0 && !ifExists) {
+ if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 2080879..a87e5b7 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -18,20 +18,18 @@
package kafka.admin
import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.ThreadPoolExecutor
-import java.util.concurrent.TimeUnit
+
import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
-import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
-import org.apache.log4j.Level
+import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
+
import scala.annotation.tailrec
import scala.collection.JavaConverters._
-import scala.collection._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._
@@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging {
if (options.has(helpOpt))
CommandLineUtils.printUsageAndDie(parser, usageMessage)
- if ((jaasFile == null)) {
- val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " +
- "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
+ if (jaasFile == null) {
+ val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " +
+ "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index b875e3e..42a17e6 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.nio.ByteBuffer
-import kafka.common.{TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index a164b4b..a454f2c 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
this(file,
channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
start = 0,
- end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
+ end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
isSlice = false)
/**
@@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}
- if (sizeInBytes > 0 && newMessages.size == 0) {
+ if (sizeInBytes > 0 && newMessages.isEmpty) {
// This indicates that the message is too large. We just return all the bytes in the file message set.
this
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a7549dc..62dc7a1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.Utils
object LogAppendInfo {
- val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
+ val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -228,7 +228,7 @@ class Log(val dir: File,
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}
- if(logSegments.size == 0) {
+ if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c6636be..4c0db0d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int,
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
- while(!segs.isEmpty) {
+ while(segs.nonEmpty) {
var group = List(segs.head)
var logSize = segs.head.size
var indexSize = segs.head.index.sizeInBytes
segs = segs.tail
- while(!segs.isEmpty &&
+ while(segs.nonEmpty &&
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f92db4e..72757c0 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
LogToClean(topicAndPartition, log, firstDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
- this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
+ this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 749c622..4357ef4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File],
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
- case e: Exception => {
+ case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
- }
}
val jobsForDir = for {
@@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File],
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
- val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
+ val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
if (needToStopCleaner && cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index ce35d68..f432732 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,9 +24,11 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.locks._
+
import kafka.utils._
import kafka.utils.CoreUtils.inLock
import kafka.common.InvalidOffsetException
+import sun.nio.ch.DirectBuffer
/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
*/
private def forceUnmap(m: MappedByteBuffer) {
try {
- if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
- (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
+ m match {
+ case buffer: DirectBuffer => buffer.cleaner().clean()
+ case _ =>
+ }
} catch {
case t: Throwable => warn("Error when freeing index buffer", t)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index e647601..4cc7c20 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -311,7 +311,7 @@ object ConsoleProducer {
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes)
- else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
+ else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 30c7afe..f7207ec 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -77,7 +77,7 @@ object GetOffsetShell {
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
- if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+ if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index bd7ca0e..8112f9e 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -89,7 +89,7 @@ object JmxTool extends Logging {
else
List(null)
- val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
+ val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])
val numExpectedAttributes: Map[ObjectName, Int] =
attributesWhitelistExists match {
@@ -101,7 +101,7 @@ object JmxTool extends Logging {
// print csv header
val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
- if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
+ if(keys.size == numExpectedAttributes.values.sum + 1)
println(keys.map("\"" + _ + "\"").mkString(","))
while(true) {
@@ -111,7 +111,7 @@ object JmxTool extends Logging {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
- if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
+ if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
println(keys.map(attributes(_)).mkString(","))
val sleep = max(0, interval - (System.currentTimeMillis - start))
Thread.sleep(sleep)
http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 87f3cc5..9d5f7e6 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// Creating one stream per each connector instance
val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(streams.size == 1)
- val stream = streams(0)
+ val stream = streams.head
iter = stream.iterator()
}