You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/27 20:25:26 UTC
[kafka] branch 2.5 updated: KAFKA-9839;
Broker should accept control requests with newer broker epoch
(#8509)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new be210d8 KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)
be210d8 is described below
commit be210d8219fdceee364f0a15414ca5f0ee7f4982
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Mon Apr 27 12:41:30 2020 -0700
KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)
A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with [...]
With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.
Reviewers: David Jacot <dj...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 7 +-
.../kafka/server/BrokerEpochIntegrationTest.scala | 26 ++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 177 ++++++++++++++++++++-
3 files changed, 190 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e613ab5..6bff265 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2961,10 +2961,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
- val curBrokerEpoch = controller.brokerEpoch
- if (brokerEpochInRequest < curBrokerEpoch) true
- else if (brokerEpochInRequest == curBrokerEpoch) false
- else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch")
+ // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
+ // about the new broker epoch and sends a control request with this epoch before the broker learns about it
+ brokerEpochInRequest < controller.brokerEpoch
}
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 23836dd..e7b43f5 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -94,15 +94,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
@Test
def testControlRequestWithCorrectBrokerEpoch(): Unit = {
- testControlRequestWithBrokerEpoch(false)
+ testControlRequestWithBrokerEpoch(0)
}
@Test
def testControlRequestWithStaleBrokerEpoch(): Unit = {
- testControlRequestWithBrokerEpoch(true)
+ testControlRequestWithBrokerEpoch(-1)
}
- private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean): Unit = {
+ @Test
+ def testControlRequestWithNewerBrokerEpoch(): Unit = {
+ testControlRequestWithBrokerEpoch(1)
+ }
+
+ private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = {
val tp = new TopicPartition("new-topic", 0)
// create topic with 1 partition, 2 replicas, one on each broker
@@ -127,8 +132,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
controllerChannelManager.startup()
val broker2 = servers(brokerId2)
- val epochInRequest =
- if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch
+ val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch
try {
// Send LeaderAndIsr request with correct broker epoch
@@ -150,10 +154,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, nodes.toSet.asJava)
- if (isEpochInRequestStale) {
+ if (epochInRequestDiffFromCurrentEpoch < 0) {
+ // stale broker epoch in LEADER_AND_ISR
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
+ // broker epoch in LEADER_AND_ISR >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000)
}
@@ -190,10 +196,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, liveBrokers.asJava)
- if (isEpochInRequestStale) {
+ if (epochInRequestDiffFromCurrentEpoch < 0) {
+ // stale broker epoch in UPDATE_METADATA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
+ // broker epoch in UPDATE_METADATA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000)
assertEquals(brokerId2,
@@ -208,9 +216,11 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest, // Correct broker epoch
true, Set(tp).asJava)
- if (isEpochInRequestStale) {
+ if (epochInRequestDiffFromCurrentEpoch < 0) {
+ // stale broker epoch in STOP_REPLICA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
} else {
+ // broker epoch in STOP_REPLICA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 44632fb..fd8da07 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -21,8 +21,7 @@ import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
import java.util.Arrays.asList
-import java.util.Random
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, Random}
import java.util.concurrent.TimeUnit
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
@@ -40,7 +39,7 @@ import kafka.network.RequestChannel.SendResponse
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
@@ -294,7 +293,7 @@ class KafkaApisTest {
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
- val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+ val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.getMagic(tp1))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
@@ -1196,7 +1195,7 @@ class KafkaApisTest {
val leaderEpoch = 0
val tp0 = new TopicPartition("tp", 0)
- val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0,0, Int.MaxValue, Optional.of(leaderEpoch)))
+ val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0, 0, Int.MaxValue, Optional.of(leaderEpoch)))
val fetchFromFollower = buildRequest(new FetchRequest.Builder(
ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 0, fetchData
).build())
@@ -1283,6 +1282,164 @@ class KafkaApisTest {
assertEquals(Errors.INVALID_REQUEST, response.error)
}
+ @Test
+ def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+ }
+
+ @Test
+ def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+ }
+
+ @Test
+ def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+ }
+
+ def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+ val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest)
+ val request = buildRequest(updateMetadataRequest)
+
+ val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+
+ EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+ EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+ EasyMock.eq(request.context.correlationId),
+ EasyMock.anyObject()
+ )).andStubReturn(
+ Seq()
+ )
+
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(replicaManager, controller, requestChannel)
+
+ createKafkaApis().handleUpdateMetadataRequest(request)
+ val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse)
+ .asInstanceOf[UpdateMetadataResponse]
+ assertEquals(expectedError, updateMetadataResponse.error())
+ EasyMock.verify(replicaManager)
+ }
+
+ @Test
+ def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+ }
+
+ @Test
+ def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+ }
+
+ @Test
+ def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+ }
+
+ def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+ val controllerId = 2
+ val controllerEpoch = 6
+ val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+ val partitionStates = Seq(
+ new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+ .setTopicName("topicW")
+ .setPartitionIndex(1)
+ .setControllerEpoch(1)
+ .setLeader(0)
+ .setLeaderEpoch(1)
+ .setIsr(asList(0, 1))
+ .setZkVersion(2)
+ .setReplicas(asList(0, 1, 2))
+ .setIsNew(false)
+ ).asJava
+ val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+ ApiKeys.LEADER_AND_ISR.latestVersion,
+ controllerId,
+ controllerEpoch,
+ brokerEpochInRequest,
+ partitionStates,
+ asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+ ).build()
+ val request = buildRequest(leaderAndIsrRequest)
+ val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setPartitionErrors(asList()))
+
+ EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+ EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+ EasyMock.eq(request.context.correlationId),
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ )).andStubReturn(
+ response
+ )
+
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(replicaManager, controller, requestChannel)
+
+ createKafkaApis().handleLeaderAndIsrRequest(request)
+ val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse)
+ .asInstanceOf[LeaderAndIsrResponse]
+ assertEquals(expectedError, leaderAndIsrResponse.error())
+ EasyMock.verify(replicaManager)
+ }
+
+ @Test
+ def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+ }
+
+ @Test
+ def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+ }
+
+ @Test
+ def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
+ val currentBrokerEpoch = 1239875L
+ testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+ }
+
+ def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+ val controllerId = 0
+ val controllerEpoch = 5
+ val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+ val fooPartition = new TopicPartition("foo", 0)
+ val stopReplicaRequest = new StopReplicaRequest.Builder(
+ ApiKeys.STOP_REPLICA.latestVersion,
+ controllerId,
+ controllerEpoch,
+ brokerEpochInRequest,
+ false,
+ Seq(fooPartition).asJava
+ ).build()
+ val request = buildRequest(stopReplicaRequest)
+
+ EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+ EasyMock.expect(replicaManager.stopReplicas(EasyMock.anyObject())).andStubReturn(
+ (mutable.Map(
+ fooPartition -> Errors.NONE
+ ), Errors.NONE)
+ )
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+ EasyMock.replay(controller, replicaManager, requestChannel)
+
+ createKafkaApis().handleStopReplicaRequest(request)
+ val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, stopReplicaRequest, capturedResponse)
+ .asInstanceOf[StopReplicaResponse]
+ assertEquals(expectedError, stopReplicaResponse.error())
+ EasyMock.verify(replicaManager)
+ }
+
/**
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/
@@ -1410,7 +1567,7 @@ class KafkaApisTest {
capturedResponse
}
- private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
+ private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest = {
val replicas = List(0.asInstanceOf[Integer]).asJava
def createPartitionState(partition: Int) = new UpdateMetadataPartitionState()
@@ -1433,8 +1590,12 @@ class KafkaApisTest {
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)).asJava)
val partitionStates = (0 until numPartitions).map(createPartitionState)
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
- 0, 0, partitionStates.asJava, Seq(broker).asJava).build()
+ new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+ 0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava).build()
+ }
+
+ private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
+ val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0)
metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
}
}