You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/26 07:54:53 UTC

[kafka] branch trunk updated: MINOR: Replace unused variables by underscore (#5003)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d1e961  MINOR: Replace unused variables by underscore (#5003)
8d1e961 is described below

commit 8d1e96181da777800e82b5c34457dc313539eced
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sat May 26 15:54:41 2018 +0800

    MINOR: Replace unused variables by underscore (#5003)
    
    And remove one unused expression.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/admin/LogDirsCommand.scala              | 2 +-
 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala   | 4 ++--
 core/src/main/scala/kafka/consumer/PartitionAssignor.scala        | 2 +-
 core/src/main/scala/kafka/controller/PartitionStateMachine.scala  | 8 ++++----
 core/src/main/scala/kafka/controller/ReplicaStateMachine.scala    | 2 +-
 .../main/scala/kafka/coordinator/group/GroupMetadataManager.scala | 2 +-
 .../kafka/coordinator/transaction/TransactionStateManager.scala   | 2 +-
 core/src/main/scala/kafka/log/Log.scala                           | 2 +-
 core/src/main/scala/kafka/log/LogManager.scala                    | 4 ++--
 core/src/main/scala/kafka/log/ProducerStateManager.scala          | 2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala                  | 2 +-
 core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala  | 6 +++---
 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala       | 2 +-
 core/src/main/scala/kafka/server/ReplicaManager.scala             | 4 ++--
 core/src/main/scala/kafka/zk/AdminZkClient.scala                  | 3 +--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala                  | 2 +-
 16 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index d8e1beb..9257942 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -65,7 +65,7 @@ object LogDirsCommand {
                         Map(
                             "logDir" -> logDir,
                             "error" -> logDirInfo.error.exceptionName(),
-                            "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) =>
+                            "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
                                 topicSet.isEmpty || topicSet.contains(topicPartition.topic)
                             }.map { case (topicPartition, replicaInfo) =>
                                 Map(
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index ed9414b..f765b94 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -256,7 +256,7 @@ object ReassignPartitionsCommand extends Logging {
       val newReplicas = partitionFields("replicas").to[Seq[Int]]
       val newLogDirs = partitionFields.get("log_dirs") match {
         case Some(jsonValue) => jsonValue.to[Seq[String]]
-        case None => newReplicas.map(r => AnyLogDir)
+        case None => newReplicas.map(_ => AnyLogDir)
       }
       if (newReplicas.size != newLogDirs.size)
         throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
@@ -569,7 +569,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
       } catch {
         case t: ExecutionException =>
           t.getCause match {
-            case e: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
+            case _: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
             case e: Throwable => throw new AdminCommandFailedException(s"Failed to alter dir for $replica", e)
           }
       }
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 5d4fb8b..7d49b99 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -79,7 +79,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
     if (ctx.consumersForTopic.nonEmpty) {
       // Collect consumer thread ids across all topics, remove duplicates, and sort to ensure determinism
-      val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) =>
+      val allThreadIds = ctx.consumersForTopic.flatMap { case (_, threadIds) =>
          threadIds
       }.toSet.toSeq.sorted
 
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 6805e32..db4c716 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -300,7 +300,7 @@ class PartitionStateMachine(config: KafkaConfig,
         failedElections.put(partition, getDataResponse.resultException.get)
       }
     }
-    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) =>
+    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
       leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
     }
     invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
@@ -323,12 +323,12 @@ class PartitionStateMachine(config: KafkaConfig,
       case ControlledShutdownPartitionLeaderElectionStrategy =>
         leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
     }
-    partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) =>
+    partitionsWithoutLeaders.foreach { case (partition, _, _) =>
       val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
       failedElections.put(partition, new StateChangeFailedException(failMsg))
     }
-    val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
-    val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
+    val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
+    val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch)
     successfulUpdates.foreach { case (partition, leaderAndIsr) =>
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5fafcc4..c9f0640 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -292,7 +292,7 @@ class ReplicaStateMachine(config: KafkaConfig,
     Seq[TopicPartition],
     Map[TopicPartition, Exception]) = {
     val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
-    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
+    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
     val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
       val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
       val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c31735b..2787251 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -465,7 +465,7 @@ class GroupMetadataManager(brokerId: Int,
               }
 
             case Some(topicPartitions) =>
-              topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+              topicPartitions.map { topicPartition =>
                 val partitionData = group.offset(topicPartition) match {
                   case None =>
                     new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index da61077..5b82be4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -140,7 +140,7 @@ class TransactionStateManager(brokerId: Int,
       val now = time.milliseconds()
       inReadLock(stateLock) {
         val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (partition, entry) =>
+          transactionMetadataCache.flatMap { case (_, entry) =>
             entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
               case Empty | CompleteCommit | CompleteAbort => true
               case _ => false
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index af83775..118288b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -755,7 +755,7 @@ class Log(@volatile var dir: File,
           records = validRecords)
 
         // update the producer state
-        for ((producerId, producerAppendInfo) <- updatedProducers) {
+        for ((_, producerAppendInfo) <- updatedProducers) {
           producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
           producerStateManager.update(producerAppendInfo)
         }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f26a84c..c0ac3b8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -878,9 +878,9 @@ class LogManager(logDirs: Seq[File],
   def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
 
   def logsByTopic(topic: String): Seq[Log] = {
-    (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) =>
+    (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
       topicPartition.topic() == topic
-    }.map { case (topicPartition, log) => log }
+    }.map { case (_, log) => log }
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d2c3b39..abeac6e 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -582,7 +582,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
    */
   def removeExpiredProducers(currentTimeMs: Long) {
-    producers.retain { case (producerId, lastEntry) =>
+    producers.retain { case (_, lastEntry) =>
       !isProducerExpired(currentTimeMs, lastEntry)
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8c17a82..f86026d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -512,7 +512,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         })
       } else {
-        fetchContext.foreachPartition((part, data) => {
+        fetchContext.foreachPartition((part, _) => {
           erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
             FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
             FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 30e6c07..ba7203e 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -150,7 +150,7 @@ class ReplicaAlterLogDirsThread(name: String,
       .filter { case (_, state) => state.isTruncatingLog }
       .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
 
-    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
 
     val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
@@ -217,7 +217,7 @@ class ReplicaAlterLogDirsThread(name: String,
 
   def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
     // Only include replica in the fetch request if it is not throttled.
-    val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) =>
+    val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
       partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
     }.reduceLeftOption { (left, right) =>
       if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
@@ -237,7 +237,7 @@ class ReplicaAlterLogDirsThread(name: String,
         val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset
         requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
       } catch {
-        case e: KafkaStorageException =>
+        case _: KafkaStorageException =>
           partitionsWithError += topicPartition
       }
     }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 68fa873..72b6616 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -333,7 +333,7 @@ class ReplicaFetcherThread(name: String,
       .filter { case (_, state) => state.isTruncatingLog }
       .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
 
-    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
 
     debug(s"Build leaderEpoch request $partitionsWithEpoch")
     val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0518e03..5dbe25b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -560,7 +560,7 @@ class ReplicaManager(val config: KafkaConfig,
   // 1. the delete records operation on this partition is successful
   // 2. low watermark of this partition is smaller than the specified offset
   private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
-    localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+    localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) =>
       deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
     }
   }
@@ -654,7 +654,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
       } catch {
-        case e: KafkaStorageException =>
+        case _: KafkaStorageException =>
           (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
         case t: Throwable =>
           error(s"Error while describing replica in dir $absolutePath", t)
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 6d7df3f..2f8da36 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -372,8 +372,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     */
   def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
     validateBrokerConfig(configs)
-    val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default)
-    changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs)
+    changeEntityConfig(ConfigType.Broker, broker.map(_.toString).getOrElse(ConfigEntityName.Default), configs)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 42f352b..0cf158e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1315,7 +1315,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
       createRecursive(ClusterIdZNode.path, ClusterIdZNode.toJson(proposedClusterId))
       proposedClusterId
     } catch {
-      case e: NodeExistsException => getClusterId.getOrElse(
+      case _: NodeExistsException => getClusterId.getOrElse(
         throw new KafkaException("Failed to get cluster id from Zookeeper. This can happen if /cluster/id is deleted from Zookeeper."))
     }
   }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.