You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/25 02:17:46 UTC
[3/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1ef91b9..850b0e0 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -110,7 +110,7 @@ object ByteBufferMessageSet {
while (true)
innerMessageAndOffsets.add(readMessageFromStream(compressed))
} catch {
- case eofe: EOFException =>
+ case _: EOFException =>
// we don't do anything at all here, because the finally
// will close the compressed input stream, and we simply
// want to return the innerMessageAndOffsets
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 7daab67..13b57e3 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -158,23 +158,16 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
)
private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
- val filteredTags = tags
- .filter { case (tagKey, tagValue) => tagValue != ""}
+ val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
if (filteredTags.nonEmpty) {
- val tagsString = filteredTags
- .map { case (key, value) => "%s=%s".format(key, value)}
- .mkString(",")
-
+ val tagsString = filteredTags.map { case (key, value) => "%s=%s".format(key, value) }.mkString(",")
Some(tagsString)
}
- else {
- None
- }
+ else None
}
private def toScope(tags: collection.Map[String, String]): Option[String] = {
- val filteredTags = tags
- .filter { case (tagKey, tagValue) => tagValue != ""}
+ val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
if (filteredTags.nonEmpty) {
// convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
val tagsString = filteredTags
@@ -184,9 +177,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
Some(tagsString)
}
- else {
- None
- }
+ else None
}
def removeAllConsumerMetrics(clientId: String) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 5408e0d..0f10577 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -82,7 +82,7 @@ class BlockingChannel( val host: String,
connectTimeoutMs))
} catch {
- case e: Throwable => disconnect()
+ case _: Throwable => disconnect()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 6b10e51..f793811 100755
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -24,8 +24,6 @@ import org.apache.kafka.common.utils.Utils
@deprecated("This class has been deprecated and will be removed in a future release. " +
"It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0")
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
- private val random = new java.util.Random
-
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index c2f95ea..2d2bfdb 100755
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -104,7 +104,7 @@ class Producer[K,V](val config: ProducerConfig,
}
}
catch {
- case e: InterruptedException =>
+ case _: InterruptedException =>
false
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 797c77b..17d09ce 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -25,7 +25,7 @@ object Resource {
def fromString(str: String): Resource = {
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name)
- case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+ case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 72f79d5..5cfdcd6 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -316,13 +316,13 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
try {
zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
try {
debug(s"Node $path does not exist, attempting to create it.")
zkUtils.createPersistentPath(path, data)
(true, 0)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
debug(s"Failed to create node for $path because it already exists.")
(false, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index d87a8cf..5e584ab 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -97,7 +97,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
def removeFetcherForPartitions(partitions: Set[TopicPartition]) {
mapLock synchronized {
- for ((key, fetcher) <- fetcherThreadMap)
+ for (fetcher <- fetcherThreadMap.values)
fetcher.removePartitions(partitions)
}
info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8cb2270..325c7af 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -127,7 +127,7 @@ class AdminManager(val config: KafkaConfig,
AdminUtils.deleteTopic(zkUtils, topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
- case e: TopicAlreadyMarkedForDeletionException =>
+ case _: TopicAlreadyMarkedForDeletionException =>
// swallow the exception, and still track deletion allowing multiple calls to wait for deletion
DeleteTopicMetadata(topic, Errors.NONE)
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 00e5d0c..cc2c4cd 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -65,7 +65,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
throw new IOException("Unrecognized version of the server meta.properties file: " + version)
}
} catch {
- case e: FileNotFoundException =>
+ case _: FileNotFoundException =>
warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
None
case e1: Exception =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c4472c6..0c7c26b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -183,7 +183,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// trigger the callback immediately if quota is not violated
callback(0)
} catch {
- case qve: QuotaViolationException =>
+ case _: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
@@ -412,9 +412,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
(sanitizedUser, clientId) match {
- case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+ case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+ case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+ case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
case (None, None) =>
}
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 2feeae8..4bf04e6 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -112,10 +112,10 @@ class DelayedFetch(delayMs: Long,
}
}
} catch {
- case utpe: UnknownTopicOrPartitionException => // Case B
+ case _: UnknownTopicOrPartitionException => // Case B
debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
return forceComplete()
- case nle: NotLeaderForPartitionException => // Case A
+ case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
return forceComplete()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index b31d838..2e9e714 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -88,7 +88,6 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
- private var lastExecutedChange = -1L
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
@@ -106,7 +105,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
"Supported versions are 1 and 2.")
}
- case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
+ case _ => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
"{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
" Received: " + json)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c1b723f..c6c8dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -755,7 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
} catch {
- case e: TopicExistsException => // let it go, possibly another broker created this topic
+ case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 117899b..0ae9124 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -43,7 +43,6 @@ class KafkaHealthcheck(brokerId: Int,
rack: Option[String],
interBrokerProtocolVersion: ApiVersion) extends Logging {
- private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
private[server] val sessionExpireListener = new SessionExpireListener
def startup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index df46336..a39fe49 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -111,7 +111,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
}
} catch {
- case e: NumberFormatException => throw malformedLineException(line)
+ case _: NumberFormatException => throw malformedLineException(line)
} finally {
reader.close()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32bc660..b43695a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -223,20 +223,20 @@ class ReplicaManager(val config: KafkaConfig,
deletePartition.toString, topic, partitionId))
val errorCode = Errors.NONE.code
getPartition(topic, partitionId) match {
- case Some(partition) =>
- if(deletePartition) {
+ case Some(_) =>
+ if (deletePartition) {
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null) {
removedPartition.delete() // this will delete the local log
val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic == t }
if (!topicHasPartitions)
- BrokerTopicStats.removeMetrics(topic)
+ BrokerTopicStats.removeMetrics(topic)
}
}
case None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
- if(deletePartition) {
+ if (deletePartition) {
val topicAndPartition = TopicAndPartition(topic, partitionId)
if(logManager.getLog(topicAndPartition).isDefined) {
@@ -358,10 +358,9 @@ class ReplicaManager(val config: KafkaConfig,
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
- val responseStatus = messagesPerPartition.map {
- case (topicAndPartition, messageSet) =>
- topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
- LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
+ val responseStatus = messagesPerPartition.map { case (topicAndPartition, _) =>
+ topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
+ LogAppendInfo.UnknownLogAppendInfo.firstOffset, Message.NoTimestamp)
}
responseCallback(responseStatus)
}
@@ -657,11 +656,9 @@ class ReplicaManager(val config: KafkaConfig,
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Short]
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
- leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
- }
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
@@ -694,7 +691,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
+ val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
stateInfo.leader == config.brokerId
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
@@ -830,7 +827,7 @@ class ReplicaManager(val config: KafkaConfig,
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
- case Some(leaderBroker) =>
+ case Some(_) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 5c487bf..bb6caa0 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -83,7 +83,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
leaderId = brokerId
onBecomingLeader()
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 56ae0c9..eea66f8 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -119,11 +119,11 @@ object ConsoleConsumer extends Logging {
val msg: BaseConsumerRecord = try {
consumer.receive()
} catch {
- case nse: StreamEndException =>
+ case _: StreamEndException =>
trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
// Consumer is already closed
return
- case nse: WakeupException =>
+ case _: WakeupException =>
trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
// Consumer will be closed
return
@@ -358,7 +358,7 @@ object ConsoleConsumer extends Logging {
val offset =
try offsetString.toLong
catch {
- case e: NumberFormatException => invalidOffset(offsetString)
+ case _: NumberFormatException => invalidOffset(offsetString)
}
if (offset < 0) invalidOffset(offsetString)
offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 63a04c9..2b3f56d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -71,7 +71,7 @@ object ConsumerPerformance {
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
- for ((topic, streamList) <- topicMessageStreams)
+ for (streamList <- topicMessageStreams.values)
for (i <- 0 until streamList.length)
threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index f4f7acf..c299676 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -414,12 +414,10 @@ object DumpLogSegments {
}
}
- shallowOffsetNotFound.foreach {
- case (fileName, listOfShallowOffsetNotFound) => {
- System.err.println("The following indexed offsets are not found in the log.")
- listOfShallowOffsetNotFound.foreach(m => {
- System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
- })
+ shallowOffsetNotFound.values.foreach { listOfShallowOffsetNotFound =>
+ System.err.println("The following indexed offsets are not found in the log.")
+ listOfShallowOffsetNotFound.foreach { case (indexedOffset, logOffset) =>
+ System.err.println(s"Indexed offset: $indexedOffset, found log offset: $logOffset")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 1c92088..9aaad3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -122,8 +122,7 @@ object EndToEndLatency {
//Check we only got the one message
if (recordIter.hasNext) {
- var count = 1
- for (elem <- recordIter) count += 1
+ val count = 1 + recordIter.size
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 17b8f0b..1a6ba69 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -375,7 +375,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
mirrorMakerConsumer.commit()
throw e
- case e: CommitFailedException =>
+ case _: CommitFailedException =>
warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " +
"another instance. If you see this regularly, it could indicate that you need to either increase " +
s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " +
@@ -435,9 +435,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
maybeFlushAndCommitOffsets()
}
} catch {
- case cte: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
trace("Caught ConsumerTimeoutException, continue iteration.")
- case we: WakeupException =>
+ case _: WakeupException =>
trace("Caught ConsumerWakeupException, continue iteration.")
}
maybeFlushAndCommitOffsets()
@@ -485,7 +485,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
mirrorMakerConsumer.stop()
}
catch {
- case ie: InterruptedException =>
+ case _: InterruptedException =>
warn("Interrupt during shutdown of the mirror maker thread")
}
}
@@ -495,7 +495,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
shutdownLatch.await()
info("Mirror maker thread shutdown complete")
} catch {
- case ie: InterruptedException =>
+ case _: InterruptedException =>
warn("Shutdown of the mirror maker thread interrupted")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index d88ec41..4e2c7ef 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -32,9 +32,6 @@ object ReplayLogProducer extends Logging {
def main(args: Array[String]) {
val config = new Config(args)
- val executor = Executors.newFixedThreadPool(config.numThreads)
- val allDone = new CountDownLatch(config.numThreads)
-
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
Thread.sleep(500)
@@ -51,7 +48,7 @@ object ReplayLogProducer extends Logging {
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
var threadList = List[ZKConsumerThread]()
- for ((topic, streamList) <- topicMessageStreams)
+ for (streamList <- topicMessageStreams.values)
for (stream <- streamList)
threadList ::= new ZKConsumerThread(config, stream)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 9a059df..01d3aa8 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -107,7 +107,7 @@ object ReplicaVerificationTool extends Logging {
Pattern.compile(regex)
}
catch {
- case e: PatternSyntaxException =>
+ case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
@@ -151,14 +151,13 @@ object ReplicaVerificationTool extends Logging {
topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId))
.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
- val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(
- topicMetadataResponse =>
- topicMetadataResponse.partitionsMetadata.map(
- partitionMetadata =>
- (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))
- ).groupBy(_._2)
- .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map {
- case(topicAndPartition, leaderId) => topicAndPartition })
+ val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
+ topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
+ (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+ }
+ }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
+ topicAndPartition
+ })
debug("Leaders per broker: " + leadersPerBroker)
val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
@@ -236,8 +235,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
}
private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
- offsetResponse.partitionErrorAndOffsets.filter {
- case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != Errors.NONE.code
+ offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
+ partitionOffsetsResponse.error != Errors.NONE.code
}.mkString
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 1c059bb..99b5aae 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -271,8 +271,8 @@ object CoreUtils extends Logging {
*/
def duplicates[T](s: Traversable[T]): Iterable[T] = {
s.groupBy(identity)
- .map{ case (k,l) => (k,l.size)}
- .filter{ case (k,l) => l > 1 }
+ .map { case (k, l) => (k, l.size)}
+ .filter { case (_, l) => l > 1 }
.keys
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index b43b4b1..896c300 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -52,7 +52,7 @@ class FileLock(val file: File) extends Logging {
flock = channel.tryLock()
flock != null
} catch {
- case e: OverlappingFileLockException => false
+ case _: OverlappingFileLockException => false
}
}
}
@@ -77,4 +77,4 @@ class FileLock(val file: File) extends Logging {
channel.close()
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index aa120ab..5d2549e 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -61,12 +61,10 @@ object Mx4jLoader extends Logging {
true
}
catch {
- case e: ClassNotFoundException => {
+ case _: ClassNotFoundException =>
info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
- }
- case e: Throwable => {
+ case e: Throwable =>
warn("Could not start register mbean in JMX", e)
- }
}
false
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 31e8a92..369bb23 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -58,13 +58,13 @@ object ReplicationUtils extends Logging {
(expectedLeader,writtenLeader) match {
case (Some(expectedLeader),Some(writtenLeader)) =>
if(expectedLeader == writtenLeader)
- return (true,writtenStat.getVersion())
+ return (true, writtenStat.getVersion())
case _ =>
}
case None =>
}
} catch {
- case e1: Exception =>
+ case _: Exception =>
}
(false,-1)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index f57245f..9600b0a 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -181,7 +181,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
/**
* Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ...
*/
- def getMap(name: String, valid: String => Boolean = s => true): Map[String, String] = {
+ def getMap(name: String, valid: String => Boolean = _ => true): Map[String, String] = {
try {
val m = CoreUtils.parseCsvMap(getString(name, ""))
m.foreach {
@@ -208,7 +208,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
CompressionCodec.getCompressionCodec(prop.toInt)
}
catch {
- case nfe: NumberFormatException =>
+ case _: NumberFormatException =>
CompressionCodec.getCompressionCodec(prop)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 80a9f1a..787cb8f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -237,7 +237,7 @@ class ZkUtils(val zkClient: ZkClient,
createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
proposedClusterId
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
}
}
@@ -389,7 +389,7 @@ class ZkUtils(val zkClient: ZkClient,
isSecure)
zkCheckedEphemeral.create()
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+ ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+ "else you have shutdown this broker and restarted it faster than the zookeeper "
@@ -445,7 +445,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createEphemeral(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
@@ -465,7 +465,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
storedData = readData(path)._1
} catch {
- case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
+ case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -484,7 +484,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createPersistent(zkClient, path, data, acls)
}
@@ -503,12 +503,12 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
zkClient.writeData(path, data)
}
}
@@ -573,7 +573,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
@@ -583,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.delete(path)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
@@ -599,7 +599,7 @@ class ZkUtils(val zkClient: ZkClient,
zkClient.delete(path, expectedVersion)
true
} catch {
- case e: ZkBadVersionException => false
+ case _: ZkBadVersionException => false
}
}
@@ -607,7 +607,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.deleteRecursive(path)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
}
@@ -624,7 +624,7 @@ class ZkUtils(val zkClient: ZkClient,
val dataAndStat = try {
(Some(zkClient.readData(path, stat)), stat)
} catch {
- case e: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
(None, stat)
}
dataAndStat
@@ -642,7 +642,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.getChildren(path)
} catch {
- case e: ZkNoNodeException => Nil
+ case _: ZkNoNodeException => Nil
}
}
@@ -754,7 +754,7 @@ class ZkUtils(val zkClient: ZkClient,
updatePersistentPath(zkPath, jsonData)
debug("Updated partition reassignment path with %s".format(jsonData))
} catch {
- case nne: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
createPersistentPath(zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -835,7 +835,7 @@ class ZkUtils(val zkClient: ZkClient,
try {
writeToZk
} catch {
- case e1: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
makeSurePersistentPathExists(path)
writeToZk
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index ce91a30..f13f59f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -111,7 +111,6 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConsumerGroupForNonExistentGroup() {
val nonExistentGroup = "non" + groupId
- val sum = client.describeConsumerGroup(nonExistentGroup).consumers
assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 55e8e4f..8502ae0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -149,11 +149,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
- for (i <- 0 until producerCount)
+ for (_ <- 0 until producerCount)
producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
maxBlockMs = 3000,
acks = 1)
- for (i <- 0 until consumerCount)
+ for (_ <- 0 until consumerCount)
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
// create the consumer offset topic
@@ -339,7 +339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRecords(numRecords, tp)
fail("should have thrown exception")
} catch {
- case e: TimeoutException => //expected
+ case _: TimeoutException => //expected
}
}
@@ -517,7 +517,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
} catch {
- case e: TopicAuthorizationException => //expected
+ case _: TopicAuthorizationException => //expected
}
}
@@ -595,7 +595,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
} catch {
- case e: TopicAuthorizationException => //expected
+ case _: TopicAuthorizationException => //expected
} finally consumer.close()
}
@@ -813,11 +813,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
}
- private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
- servers.head.apis.authorizer.get.removeAcls(acls, resource)
- TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
- }
-
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 102b7cf..732b99f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -280,7 +280,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
try {
consumer.poll(50)
} catch {
- case e: WakeupException => // ignore for shutdown
+ case _: WakeupException => // ignore for shutdown
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 816f36a..b5aaaf4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -142,12 +142,11 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
- case iae: IllegalArgumentException => // this is ok
- case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+ case _: IllegalArgumentException => // this is ok
}
// non-blocking send a list of records
- for (i <- 1 to numRecords)
+ for (_ <- 1 to numRecords)
producer.send(record0, callback)
// check that all messages have been acked via offset
@@ -234,7 +233,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
- for (i <- 1 to numRecords)
+ for (_ <- 1 to numRecords)
producer.send(record0)
val response0 = producer.send(record0)
@@ -328,8 +327,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes))
fail("Should not allow sending a record to a partition not present in the metadata")
} catch {
- case ke: KafkaException => // this is ok
- case e: Throwable => fail("Only expecting KafkaException", e)
+ case _: KafkaException => // this is ok
}
AdminUtils.addPartitions(zkUtils, topic, 2)
@@ -370,8 +368,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
- for (i <- 0 until 50) {
- val responses = (0 until numRecords) map (i => producer.send(record))
+ for (_ <- 0 until 50) {
+ val responses = (0 until numRecords) map (_ => producer.send(record))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.flush()
assertTrue("All requests are complete.", responses.forall(_.isDone()))
@@ -389,15 +387,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// create topic
val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
val leader0 = leaders(0)
- val leader1 = leaders(1)
// create record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
// Test closing from caller thread.
- for (i <- 0 until 50) {
+ for (_ <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
- val responses = (0 until numRecords) map (i => producer.send(record0))
+ val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(0, TimeUnit.MILLISECONDS)
responses.foreach { future =>
@@ -436,7 +433,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// Trigger another batch in accumulator before close the producer. These messages should
// not be sent.
if (sendRecords)
- (0 until numRecords) foreach (i => producer.send(record))
+ (0 until numRecords) foreach (_ => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
// Test close with non zero timeout. Should not block at all.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 4e6c740..479e749 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -276,7 +276,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
AclCommand.main(deleteDescribeAclArgs)
AclCommand.main(deleteWriteAclArgs)
- servers.foreach { s =>
+ servers.foreach { _ =>
TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
index b26b242..d15a01d 100644
--- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
object FixedPortTestUtils {
def choosePorts(count: Int): Seq[Int] = {
try {
- val sockets = (0 until count).map(i => new ServerSocket(0))
+ val sockets = (0 until count).map(_ => new ServerSocket(0))
val ports = sockets.map(_.getLocalPort())
sockets.foreach(_.close())
ports
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ca020a6..83280dc 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -65,9 +65,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
- for (i <- 0 until producerCount)
+ for (_ <- 0 until producerCount)
producers += createNewProducer
- for (i <- 0 until consumerCount) {
+ for (_ <- 0 until consumerCount) {
consumers += createNewConsumer
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index d18dc3a..aefe5bd 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -782,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create a group of consumers, subscribe the consumers to all the topics and start polling
// for the topic partition assignment
- val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
+ val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
try {
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
@@ -862,10 +862,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
testProducer.send(null, null)
fail("Should not allow sending a null record")
} catch {
- case e: Throwable => {
+ case _: Throwable =>
assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue())
assertEquals("Interceptor should not receive metadata with an exception when record is null", 0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
- }
}
// create consumer with interceptor
@@ -1222,7 +1221,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
TestUtils.waitUntilTrue(() => {
- val records = consumer0.poll(50)
+ consumer0.poll(50)
consumer0.assignment() == newAssignment.asJava
}, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
@@ -1335,7 +1334,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
assertTrue(consumerCount <= subscriptions.size)
val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
- for (i <- 0 until consumerCount)
+ for (_ <- 0 until consumerCount)
consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
consumers ++= consumerGroup
@@ -1364,7 +1363,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): Unit = {
assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
- for (i <- 0 until numOfConsumersToAdd) {
+ for (_ <- 0 until numOfConsumersToAdd) {
val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
consumerGroup += newConsumer
consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
@@ -1415,7 +1414,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
rebalanceListener: ConsumerRebalanceListener): Unit = {
consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
TestUtils.waitUntilTrue(() => {
- val records = consumer.poll(50)
+ consumer.poll(50)
consumer.assignment() == subscriptions.asJava
}, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 734eb66..a75e7c7 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -39,7 +39,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
createNewProducerWithNoSerializer(brokerList)
fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
} catch {
- case ce : ConfigException => // this is ok
+ case _ : ConfigException => // this is ok
}
// create a producer with explicit serializers should succeed
@@ -67,7 +67,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
producer.send(record5)
fail("Should have gotten a SerializationException")
} catch {
- case se: SerializationException => // this is ok
+ case _: SerializationException => // this is ok
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 5994a1d..8d676d1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -88,7 +88,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
scheduler.start
// rolling bounce brokers
- for (i <- 0 until numServers) {
+ for (_ <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
@@ -143,7 +143,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
futures.map(_.get)
sent += numRecords
} catch {
- case e : Exception => failed = true
+ case _ : Exception => failed = true
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index 14807bc..cebfb04 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -102,7 +102,6 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
private val orgName = config.getProperty(MiniKdc.OrgName)
private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
- private val dnString = s"dc=$orgName,dc=$orgDomain"
private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
private val krb5conf = new File(workDir, "krb5.conf")
@@ -163,7 +162,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
partition.setId(orgName)
partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
- val dn = new Dn(dnString)
+ val dn = new Dn(s"dc=$orgName,dc=$orgDomain")
partition.setSuffixDn(dn)
ds.addPartition(partition)
@@ -207,7 +206,7 @@ class MiniKdc(config: Properties, workDir: File) extends Logging {
val kerberosConfig = new KerberosConfig
kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
- kerberosConfig.setSearchBaseDn(dnString)
+ kerberosConfig.setSearchBaseDn(s"dc=$orgName,dc=$orgDomain")
kerberosConfig.setPaEncTimestampRequired(false)
kdc = new KdcServer(kerberosConfig)
kdc.setDirectoryService(ds)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 6556100..51f02d1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -301,7 +301,7 @@ object TestLogCleaning {
consumedWriter.newLine()
}
} catch {
- case e: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
}
}
consumedWriter.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8adc7e2..f5cee0c 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -69,7 +69,6 @@ object StressTestLog {
abstract class WorkerThread extends Thread {
override def run() {
try {
- var offset = 0
while(running.get)
work()
} catch {
@@ -107,7 +106,7 @@ object StressTestLog {
case _ =>
}
} catch {
- case e: OffsetOutOfRangeException => // this is okay
+ case _: OffsetOutOfRangeException => // this is okay
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestCrcPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
index 0c1e1ad..daeecbd 100755
--- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
@@ -18,7 +18,6 @@ package kafka.log
import java.util.Random
import kafka.message._
-import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
object TestCrcPerformance {
@@ -28,21 +27,18 @@ object TestCrcPerformance {
Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size")
val numMessages = args(0).toInt
val messageSize = args(1).toInt
- //val numMessages = 100000000
- //val messageSize = 32
- val dir = TestUtils.tempDir()
val content = new Array[Byte](messageSize)
new Random(1).nextBytes(content)
// create message test
val start = System.nanoTime
- for(i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages)
new Message(content)
- }
- val ellapsed = System.nanoTime - start
- println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, ellapsed/(1000.0*1000.0*1000.0),
- ellapsed / numMessages.toDouble))
+
+ val elapsed = System.nanoTime - start
+ println("%d messages created in %.2f seconds + (%.2f ns per message).".format(numMessages, elapsed / (1000.0*1000.0*1000.0),
+ elapsed / numMessages.toDouble))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index ab807a1..72c7f28 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -33,12 +33,13 @@ object TestKafkaAppender extends Logging {
try {
PropertyConfigurator.configure(args(0))
} catch {
- case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
- e.printStackTrace()
- System.exit(1)
+ case e: Exception =>
+ System.err.println("KafkaAppender could not be initialized ! Exiting..")
+ e.printStackTrace()
+ System.exit(1)
}
- for(i <- 1 to 10)
+ for (_ <- 1 to 10)
info("test")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index db281bf..6bd8e4f 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -101,7 +101,8 @@ object TestLinearWriteSpeed {
val rand = new Random
rand.nextBytes(buffer.array)
val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
- val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = (0 until numMessages).map(x => new Message(new Array[Byte](messageSize))): _*)
+ val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec,
+ messages = (0 until numMessages).map(_ => new Message(new Array[Byte](messageSize))): _*)
val writables = new Array[Writable](numFiles)
val scheduler = new KafkaScheduler(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9445191..9db2ffd 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -94,7 +94,7 @@ object TestOffsetManager {
offset += 1
}
catch {
- case e1: ClosedByInterruptException =>
+ case _: ClosedByInterruptException =>
offsetsChannel.disconnect()
case e2: IOException =>
println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, groupId, e2))
@@ -158,7 +158,7 @@ object TestOffsetManager {
}
}
catch {
- case e1: ClosedByInterruptException =>
+ case _: ClosedByInterruptException =>
channel.disconnect()
channels.remove(coordinatorId)
case e2: IOException =>
@@ -168,7 +168,7 @@ object TestOffsetManager {
}
}
catch {
- case e: IOException =>
+ case _: IOException =>
println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
metadataChannel.disconnect()
println("Creating new query channel.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..6ccac29 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -104,7 +104,7 @@ object TestPurgatoryPerformance {
val latch = new CountDownLatch(numRequests)
val start = System.currentTimeMillis
val rand = new Random()
- val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
+ val keys = (0 until numKeys).map(_ => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
@volatile var requestArrivalTime = start
@volatile var end = 0L
val generator = new Runnable {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 763e4ec..0f846e1 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -69,8 +69,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
AdminUtils.addPartitions(zkUtils, "Blah", 1)
fail("Topic should not exist")
} catch {
- case e: AdminOperationException => //this is good
- case e2: Throwable => throw e2
+ case _: AdminOperationException => //this is good
}
}
@@ -80,8 +79,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
} catch {
- case e: AdminOperationException => //this is good
- case e2: Throwable => throw e2
+ case _: AdminOperationException => //this is good
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index ddfbb51..ff86693 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -250,7 +250,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.parseEntity(opts)
fail("Did not fail with invalid argument list")
} catch {
- case e: IllegalArgumentException => // expected exception
+ case _: IllegalArgumentException => // expected exception
}
}
@@ -315,7 +315,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter", "--add-config", "a=b,c=d")
fail("Did not fail with invalid client-id")
} catch {
- case e: InvalidConfigException => // expected
+ case _: InvalidConfigException => // expected
}
checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ccb3618..d1fcbc0 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -164,7 +164,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
@Test
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
- val topicAndPartition = TopicAndPartition(topic, 0)
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkUtils, topic)
@@ -208,7 +207,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
AdminUtils.deleteTopic(zkUtils, "test2")
fail("Expected UnknownTopicOrPartitionException")
} catch {
- case e: UnknownTopicOrPartitionException => // expected exception
+ case _: UnknownTopicOrPartitionException => // expected exception
}
// verify delete topic path for test2 is removed from zookeeper
TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
@@ -270,7 +269,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
- case e: TopicAlreadyMarkedForDeletionException => // expected exception
+ case _: TopicAlreadyMarkedForDeletionException => // expected exception
}
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -300,7 +299,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
var counter = 0
- for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+ for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
counter += 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 3691919..39bcb7a 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -87,7 +87,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (_, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 1 &&
assignments.get.filter(_.group == group).head.consumerId.isDefined
@@ -113,7 +113,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (_, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 2 &&
assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 &&
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 2a3724e..90a354e 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -45,7 +45,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging wi
kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
- val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+ val (proposedAssignment, _) = ReassignPartitionsCommand.generateAssignment(zkUtils,
rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
val assignment = proposedAssignment map { case (topicPartition, replicas) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
index fff3e7b..b71b00b 100644
--- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala
@@ -34,7 +34,7 @@ class ApiUtilsTest extends JUnitSuite {
@Test
def testShortStringNonASCII() {
// Random-length strings
- for(i <- 0 to 100) {
+ for(_ <- 0 to 100) {
// Since we're using UTF-8 encoding, each encoded byte will be one to four bytes long
val s: String = ApiUtilsTest.rnd.nextString(math.abs(ApiUtilsTest.rnd.nextInt()) % (Short.MaxValue / 4))
val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
@@ -47,7 +47,7 @@ class ApiUtilsTest extends JUnitSuite {
@Test
def testShortStringASCII() {
// Random-length strings
- for(i <- 0 to 100) {
+ for(_ <- 0 to 100) {
val s: String = TestUtils.randomString(math.abs(ApiUtilsTest.rnd.nextInt()) % Short.MaxValue)
val bb: ByteBuffer = ByteBuffer.allocate(ApiUtils.shortStringLength(s))
ApiUtils.writeShortString(bb, s)
@@ -68,17 +68,13 @@ class ApiUtilsTest extends JUnitSuite {
ApiUtils.shortStringLength(s2)
fail
} catch {
- case e: KafkaException => {
- // ok
- }
+ case _: KafkaException => // ok
}
try {
ApiUtils.writeShortString(bb, s2)
fail
} catch {
- case e: KafkaException => {
- // ok
- }
+ case _: KafkaException => // ok
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index f8fbae7..16fe788 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,7 +23,6 @@ import kafka.common._
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.utils.SystemTime
-import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
import java.nio.ByteBuffer
@@ -37,10 +36,6 @@ import org.junit.Assert._
object SerializationTestUtils {
private val topic1 = "test1"
private val topic2 = "test2"
- private val leader1 = 0
- private val isr1 = List(0, 1, 2)
- private val leader2 = 0
- private val isr2 = List(0, 2, 3)
private val partitionDataFetchResponse0 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes)))
private val partitionDataFetchResponse1 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes)))
private val partitionDataFetchResponse2 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes)))
@@ -84,37 +79,6 @@ object SerializationTestUtils {
private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
- private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
-
- private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0)
- private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1)
- private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2)
- private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3)
- private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
- private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
- private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
-
- private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
- private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
- private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id))
- private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id))
-
- private val leaderIsrAndControllerEpoch0 = new LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch1 = new LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0)
- private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0)
-
- private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet)
- private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet)
- private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet)
- private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet)
-
- private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map(
- TopicAndPartition(topic1,0) -> partitionStateInfo0,
- TopicAndPartition(topic1,1) -> partitionStateInfo1,
- TopicAndPartition(topic1,2) -> partitionStateInfo2,
- TopicAndPartition(topic1,3) -> partitionStateInfo3
- )
def createTestProducerRequest: ProducerRequest = {
new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/ConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
index 26154f2..2d20b1e 100644
--- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala
@@ -40,7 +40,7 @@ class ConfigTest {
fail("Should throw InvalidClientIdException.")
}
catch {
- case e: InvalidConfigException => "This is good."
+ case _: InvalidConfigException => // This is good
}
}
@@ -51,7 +51,7 @@ class ConfigTest {
ProducerConfig.validateClientId(validClientIds(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}
@@ -70,7 +70,7 @@ class ConfigTest {
fail("Should throw InvalidGroupIdException.")
}
catch {
- case e: InvalidConfigException => "This is good."
+ case _: InvalidConfigException => // This is good
}
}
@@ -81,7 +81,7 @@ class ConfigTest {
ConsumerConfig.validateGroupId(validGroupIds(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 66549af..39eb315 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -28,7 +28,7 @@ class TopicTest {
val invalidTopicNames = new ArrayBuffer[String]()
invalidTopicNames += ("", ".", "..")
var longName = "ATCG"
- for (i <- 1 to 6)
+ for (_ <- 1 to 6)
longName += longName
invalidTopicNames += longName
invalidTopicNames += longName.drop(6)
@@ -43,7 +43,7 @@ class TopicTest {
fail("Should throw InvalidTopicException.")
}
catch {
- case e: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
+ case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
}
}
@@ -54,7 +54,7 @@ class TopicTest {
Topic.validate(validTopicNames(i))
}
catch {
- case e: Exception => fail("Should not throw exception.")
+ case _: Exception => fail("Should not throw exception.")
}
}
}