You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/05/02 16:46:21 UTC
[kafka] branch trunk updated: MINOR: Clean up some redundant code
from ReplicaManager (#10623)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 325cb88 MINOR: Clean up some redundant code from ReplicaManager (#10623)
325cb88 is described below
commit 325cb8853b7682bfe065a96d8b7808f0ef6cb89e
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Mon May 3 00:44:32 2021 +0800
MINOR: Clean up some redundant code from ReplicaManager (#10623)
Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 1 -
core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +---
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 2 --
.../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 12 ++++++------
4 files changed, 7 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ef84763..8abd7db 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -284,7 +284,6 @@ class KafkaApis(val requestChannel: RequestChannel,
request.context.correlationId,
stopReplicaRequest.controllerId,
stopReplicaRequest.controllerEpoch,
- stopReplicaRequest.brokerEpoch,
partitionStates)
// Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
// cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5d05b9b..d813241 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -325,7 +325,7 @@ class ReplicaManager(val config: KafkaConfig,
// If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
- // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0
+ // Thus, we choose to halt the broker on any log directory failure if IBP < 1.0
val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
@@ -349,7 +349,6 @@ class ReplicaManager(val config: KafkaConfig,
def stopReplicas(correlationId: Int,
controllerId: Int,
controllerEpoch: Int,
- brokerEpoch: Long,
partitionStates: Map[TopicPartition, StopReplicaPartitionState]
): (mutable.Map[TopicPartition, Errors], Errors) = {
replicaStateChangeLock synchronized {
@@ -434,7 +433,6 @@ class ReplicaManager(val config: KafkaConfig,
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
s"${e.getClass.getName} exception: ${e.getMessage}")
- responseMap.put(topicPartition, Errors.forException(e))
}
responseMap.put(topicPartition, Errors.forException(e))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7813c2d..bd0de39 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1674,7 +1674,6 @@ class KafkaApisTest {
EasyMock.eq(request.context.correlationId),
EasyMock.eq(controllerId),
EasyMock.eq(controllerEpoch),
- EasyMock.eq(brokerEpoch),
EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
)).andReturn(
(mutable.Map(
@@ -2900,7 +2899,6 @@ class KafkaApisTest {
EasyMock.eq(request.context.correlationId),
EasyMock.eq(controllerId),
EasyMock.eq(controllerEpoch),
- EasyMock.eq(brokerEpochInRequest),
EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
)).andStubReturn(
(mutable.Map(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index deb8827..5a6ea2e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1358,7 +1358,7 @@ class ReplicaManagerTest {
// We have a fetch in purgatory, now receive a stop replica request and
// assert that the fetch returns with a NOT_LEADER error
- replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+ replicaManager.stopReplicas(2, 0, 0,
mutable.Map(tp0 -> new StopReplicaPartitionState()
.setPartitionIndex(tp0.partition)
.setDeletePartition(true)
@@ -1398,7 +1398,7 @@ class ReplicaManagerTest {
Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
- replicaManager.stopReplicas(2, 0, 0, brokerEpoch,
+ replicaManager.stopReplicas(2, 0, 0,
mutable.Map(tp0 -> new StopReplicaPartitionState()
.setPartitionIndex(tp0.partition)
.setDeletePartition(true)
@@ -2022,7 +2022,7 @@ class ReplicaManagerTest {
.setDeletePartition(false)
)
- val (_, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+ val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
}
@@ -2050,7 +2050,7 @@ class ReplicaManagerTest {
.setDeletePartition(false)
)
- val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+ val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.NONE, error)
assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
}
@@ -2090,7 +2090,7 @@ class ReplicaManagerTest {
.setDeletePartition(deletePartitions)
)
- val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+ val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.NONE, error)
if (throwIOException && deletePartitions) {
@@ -2217,7 +2217,7 @@ class ReplicaManagerTest {
.setDeletePartition(deletePartition)
)
- val (result, error) = replicaManager.stopReplicas(1, 0, 0, brokerEpoch, partitionStates)
+ val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.NONE, error)
assertEquals(Map(tp0 -> expectedOutput), result)