You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/05/02 16:46:21 UTC

[kafka] branch trunk updated: MINOR: Clean up some redundant code from ReplicaManager (#10623)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 325cb88  MINOR: Clean up some redundant code from ReplicaManager (#10623)
325cb88 is described below

commit 325cb8853b7682bfe065a96d8b7808f0ef6cb89e
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Mon May 3 00:44:32 2021 +0800

    MINOR: Clean up some redundant code from ReplicaManager (#10623)
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala             |  1 -
 core/src/main/scala/kafka/server/ReplicaManager.scala        |  4 +---
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala    |  2 --
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala    | 12 ++++++------
 4 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ef84763..8abd7db 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -284,7 +284,6 @@ class KafkaApis(val requestChannel: RequestChannel,
         request.context.correlationId,
         stopReplicaRequest.controllerId,
         stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
         partitionStates)
       // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
       // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5d05b9b..d813241 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -325,7 +325,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
     // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
-    // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0
+    // Thus, we choose to halt the broker on any log directory failure if IBP < 1.0
     val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
     logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
     logDirFailureHandler.start()
@@ -349,7 +349,6 @@ class ReplicaManager(val config: KafkaConfig,
   def stopReplicas(correlationId: Int,
                    controllerId: Int,
                    controllerEpoch: Int,
-                   brokerEpoch: Long,
                    partitionStates: Map[TopicPartition, StopReplicaPartitionState]
                   ): (mutable.Map[TopicPartition, Errors], Errors) = {
     replicaStateChangeLock synchronized {
@@ -434,7 +433,6 @@ class ReplicaManager(val config: KafkaConfig,
                 s"controller $controllerId with correlation id $correlationId " +
                 s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
-              responseMap.put(topicPartition, Errors.forException(e))
           }
           responseMap.put(topicPartition, Errors.forException(e))
         }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7813c2d..bd0de39 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1674,7 +1674,6 @@ class KafkaApisTest {
       EasyMock.eq(request.context.correlationId),
       EasyMock.eq(controllerId),
       EasyMock.eq(controllerEpoch),
-      EasyMock.eq(brokerEpoch),
       EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
     )).andReturn(
       (mutable.Map(
@@ -2900,7 +2899,6 @@ class KafkaApisTest {
       EasyMock.eq(request.context.correlationId),
       EasyMock.eq(controllerId),
       EasyMock.eq(controllerEpoch),
-      EasyMock.eq(brokerEpochInRequest),
       EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
     )).andStubReturn(
       (mutable.Map(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index deb8827..5a6ea2e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1358,7 +1358,7 @@ class ReplicaManagerTest {
 
     // We have a fetch in purgatory, now receive a stop replica request and
     // assert that the fetch returns with a NOT_LEADER error
-    replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+    replicaManager.stopReplicas(2, 0, 0,
       mutable.Map(tp0 -> new StopReplicaPartitionState()
         .setPartitionIndex(tp0.partition)
         .setDeletePartition(true)
@@ -1398,7 +1398,7 @@ class ReplicaManagerTest {
 
     Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
 
-    replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+    replicaManager.stopReplicas(2, 0, 0,
       mutable.Map(tp0 -> new StopReplicaPartitionState()
         .setPartitionIndex(tp0.partition)
         .setDeletePartition(true)
@@ -2022,7 +2022,7 @@ class ReplicaManagerTest {
       .setDeletePartition(false)
     )
 
-    val (_, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+    val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
     assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
   }
 
@@ -2050,7 +2050,7 @@ class ReplicaManagerTest {
       .setDeletePartition(false)
     )
 
-    val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+    val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
     assertEquals(Errors.NONE, error)
     assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
   }
@@ -2090,7 +2090,7 @@ class ReplicaManagerTest {
       .setDeletePartition(deletePartitions)
     )
 
-    val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+    val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
     assertEquals(Errors.NONE, error)
 
     if (throwIOException && deletePartitions) {
@@ -2217,7 +2217,7 @@ class ReplicaManagerTest {
       .setDeletePartition(deletePartition)
     )
 
-    val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+    val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
     assertEquals(Errors.NONE, error)
     assertEquals(Map(tp0 -> expectedOutput), result)