You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/06/16 02:21:42 UTC

[kafka] branch trunk updated: MINOR: fix some warnings in the broker

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e047864  MINOR: fix some warnings in the broker
e047864 is described below

commit e047864f30fa47c6bcb2a0e6d9da86fa053fe6f6
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Sat Jun 15 19:21:10 2019 -0700

    MINOR: fix some warnings in the broker
    
    Author: Colin P. Mccabe <cm...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6942 from cmccabe/fix-scala-warnings
---
 clients/src/main/java/org/apache/kafka/common/Cluster.java            | 2 +-
 core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala    | 3 +++
 core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala      | 1 -
 core/src/main/scala/kafka/log/LogCleaner.scala                        | 2 +-
 .../test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala  | 1 -
 .../test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala | 4 ++--
 core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala              | 2 +-
 .../scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala     | 1 -
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala             | 2 +-
 9 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0b01d22..e69be42 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -189,7 +189,7 @@ public final class Cluster {
      * Get the node by node id if the replica for the given partition is online
      * @param partition
      * @param id
-     * @return
+     * @return the node
      */
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index a9ac67f..0bd1572 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -642,6 +642,9 @@ class GroupCoordinator(val brokerId: Int,
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
             responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+
+          case _ =>
+            throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
         }
       }
     }
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 83ff709..1014f40 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -20,7 +20,6 @@ package kafka.coordinator.group
 import java.util
 
 import kafka.utils.nonthreadsafe
-import org.apache.kafka.common.protocol.Errors
 
 case class MemberSummary(memberId: String,
                          groupInstanceId: Option[String],
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3180f3d..cdeb4e6 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -1061,7 +1061,7 @@ private[log] class CleanedTransactionMetadata {
   private val ongoingCommittedTxns = mutable.Set.empty[Long]
   private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
   // Minheap of aborted transactions sorted by the transaction first offset
-  private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
+  private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
     override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
   }.reverse)
 
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
index 55671ba..a126c83 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -22,7 +22,6 @@ import java.time.Duration
 import java.util
 import java.util.Collections
 
-import kafka.api.IntegrationTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.NewTopic
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 5db4309..de7fd6b 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -54,8 +54,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     defaultReplicationFactor = defaultReplicationFactor
     ).map(KafkaConfig.fromProps)
 
-  private var numPartitions = 1
-  private var defaultReplicationFactor = 1.toShort
+  private val numPartitions = 1
+  private val defaultReplicationFactor = 1.toShort
 
   private var topicService: AdminClientTopicService = _
   private var adminClient: JAdminClient = _
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index bacdc81..64e298f 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 import java.util.Properties
 
 import kafka.log.{Log, LogConfig, LogManager}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 4cd91dd..2cf3e5d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -2346,7 +2346,6 @@ class GroupCoordinatorTest {
     val offset = offsetAndMetadata(0)
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
-    val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f2cdd42..aac7ad1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -510,7 +510,7 @@ class KafkaApisTest {
   }
 
   @Test
-  def testJoinGroupProtocolsOrder: Unit = {
+  def testJoinGroupProtocolsOrder(): Unit = {
     val protocols = List(
       new JoinGroupRequestProtocol().setName("first").setMetadata("first".getBytes()),
       new JoinGroupRequestProtocol().setName("second").setMetadata("second".getBytes())