You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/26 07:54:53 UTC
[kafka] branch trunk updated: MINOR: Replace unused variables by
underscore (#5003)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d1e961 MINOR: Replace unused variables by underscore (#5003)
8d1e961 is described below
commit 8d1e96181da777800e82b5c34457dc313539eced
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sat May 26 15:54:41 2018 +0800
MINOR: Replace unused variables by underscore (#5003)
And remove one unused expression.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/admin/LogDirsCommand.scala | 2 +-
core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 4 ++--
core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 2 +-
core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 8 ++++----
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala | 2 +-
.../main/scala/kafka/coordinator/group/GroupMetadataManager.scala | 2 +-
.../kafka/coordinator/transaction/TransactionStateManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 4 ++--
core/src/main/scala/kafka/log/ProducerStateManager.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 6 +++---
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++--
core/src/main/scala/kafka/zk/AdminZkClient.scala | 3 +--
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
16 files changed, 24 insertions(+), 25 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index d8e1beb..9257942 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -65,7 +65,7 @@ object LogDirsCommand {
Map(
"logDir" -> logDir,
"error" -> logDirInfo.error.exceptionName(),
- "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) =>
+ "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
topicSet.isEmpty || topicSet.contains(topicPartition.topic)
}.map { case (topicPartition, replicaInfo) =>
Map(
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index ed9414b..f765b94 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -256,7 +256,7 @@ object ReassignPartitionsCommand extends Logging {
val newReplicas = partitionFields("replicas").to[Seq[Int]]
val newLogDirs = partitionFields.get("log_dirs") match {
case Some(jsonValue) => jsonValue.to[Seq[String]]
- case None => newReplicas.map(r => AnyLogDir)
+ case None => newReplicas.map(_ => AnyLogDir)
}
if (newReplicas.size != newLogDirs.size)
throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
@@ -569,7 +569,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
} catch {
case t: ExecutionException =>
t.getCause match {
- case e: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
+ case _: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
case e: Throwable => throw new AdminCommandFailedException(s"Failed to alter dir for $replica", e)
}
}
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 5d4fb8b..7d49b99 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -79,7 +79,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
if (ctx.consumersForTopic.nonEmpty) {
// Collect consumer thread ids across all topics, remove duplicates, and sort to ensure determinism
- val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) =>
+ val allThreadIds = ctx.consumersForTopic.flatMap { case (_, threadIds) =>
threadIds
}.toSet.toSeq.sorted
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 6805e32..db4c716 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -300,7 +300,7 @@ class PartitionStateMachine(config: KafkaConfig,
failedElections.put(partition, getDataResponse.resultException.get)
}
}
- val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) =>
+ val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
}
invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
@@ -323,12 +323,12 @@ class PartitionStateMachine(config: KafkaConfig,
case ControlledShutdownPartitionLeaderElectionStrategy =>
leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
}
- partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) =>
+ partitionsWithoutLeaders.foreach { case (partition, _, _) =>
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, new StateChangeFailedException(failMsg))
}
- val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
- val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
+ val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
+ val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5fafcc4..c9f0640 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -292,7 +292,7 @@ class ReplicaStateMachine(config: KafkaConfig,
Seq[TopicPartition],
Map[TopicPartition, Exception]) = {
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
- val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
+ val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c31735b..2787251 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -465,7 +465,7 @@ class GroupMetadataManager(brokerId: Int,
}
case Some(topicPartitions) =>
- topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+ topicPartitions.map { topicPartition =>
val partitionData = group.offset(topicPartition) match {
case None =>
new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index da61077..5b82be4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -140,7 +140,7 @@ class TransactionStateManager(brokerId: Int,
val now = time.milliseconds()
inReadLock(stateLock) {
val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
- transactionMetadataCache.flatMap { case (partition, entry) =>
+ transactionMetadataCache.flatMap { case (_, entry) =>
entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
case Empty | CompleteCommit | CompleteAbort => true
case _ => false
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index af83775..118288b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -755,7 +755,7 @@ class Log(@volatile var dir: File,
records = validRecords)
// update the producer state
- for ((producerId, producerAppendInfo) <- updatedProducers) {
+ for ((_, producerAppendInfo) <- updatedProducers) {
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
producerStateManager.update(producerAppendInfo)
}
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f26a84c..c0ac3b8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -878,9 +878,9 @@ class LogManager(logDirs: Seq[File],
def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
def logsByTopic(topic: String): Seq[Log] = {
- (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) =>
+ (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
topicPartition.topic() == topic
- }.map { case (topicPartition, log) => log }
+ }.map { case (_, log) => log }
}
/**
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d2c3b39..abeac6e 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -582,7 +582,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
*/
def removeExpiredProducers(currentTimeMs: Long) {
- producers.retain { case (producerId, lastEntry) =>
+ producers.retain { case (_, lastEntry) =>
!isProducerExpired(currentTimeMs, lastEntry)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8c17a82..f86026d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -512,7 +512,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
} else {
- fetchContext.foreachPartition((part, data) => {
+ fetchContext.foreachPartition((part, _) => {
erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 30e6c07..ba7203e 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -150,7 +150,7 @@ class ReplicaAlterLogDirsThread(name: String,
.filter { case (_, state) => state.isTruncatingLog }
.map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
- val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+ val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
@@ -217,7 +217,7 @@ class ReplicaAlterLogDirsThread(name: String,
def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
// Only include replica in the fetch request if it is not throttled.
- val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) =>
+ val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
}.reduceLeftOption { (left, right) =>
if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
@@ -237,7 +237,7 @@ class ReplicaAlterLogDirsThread(name: String,
val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
} catch {
- case e: KafkaStorageException =>
+ case _: KafkaStorageException =>
partitionsWithError += topicPartition
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 68fa873..72b6616 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -333,7 +333,7 @@ class ReplicaFetcherThread(name: String,
.filter { case (_, state) => state.isTruncatingLog }
.map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
- val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+ val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
debug(s"Build leaderEpoch request $partitionsWithEpoch")
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0518e03..5dbe25b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -560,7 +560,7 @@ class ReplicaManager(val config: KafkaConfig,
// 1. the delete records operation on this partition is successful
// 2. low watermark of this partition is smaller than the specified offset
private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
- localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+ localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) =>
deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
}
}
@@ -654,7 +654,7 @@ class ReplicaManager(val config: KafkaConfig,
}
} catch {
- case e: KafkaStorageException =>
+ case _: KafkaStorageException =>
(absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
case t: Throwable =>
error(s"Error while describing replica in dir $absolutePath", t)
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 6d7df3f..2f8da36 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -372,8 +372,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
*/
def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
validateBrokerConfig(configs)
- val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default)
- changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs)
+ changeEntityConfig(ConfigType.Broker, broker.map(_.toString).getOrElse(ConfigEntityName.Default), configs)
}
/**
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 42f352b..0cf158e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1315,7 +1315,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
createRecursive(ClusterIdZNode.path, ClusterIdZNode.toJson(proposedClusterId))
proposedClusterId
} catch {
- case e: NodeExistsException => getClusterId.getOrElse(
+ case _: NodeExistsException => getClusterId.getOrElse(
throw new KafkaException("Failed to get cluster id from Zookeeper. This can happen if /cluster/id is deleted from Zookeeper."))
}
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.