You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/04/27 20:25:26 UTC

[kafka] branch 2.5 updated: KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new be210d8  KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)
be210d8 is described below

commit be210d8219fdceee364f0a15414ca5f0ee7f4982
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Mon Apr 27 12:41:30 2020 -0700

    KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)
    
    A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with [...]
    
    With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.
    
    Reviewers: David Jacot <dj...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   7 +-
 .../kafka/server/BrokerEpochIntegrationTest.scala  |  26 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 177 ++++++++++++++++++++-
 3 files changed, 190 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e613ab5..6bff265 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2961,10 +2961,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     // if the controller hasn't been upgraded to use KIP-380
     if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
     else {
-      val curBrokerEpoch = controller.brokerEpoch
-      if (brokerEpochInRequest < curBrokerEpoch) true
-      else if (brokerEpochInRequest == curBrokerEpoch) false
-      else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch")
+      // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
+      // about the new broker epoch and sends a control request with this epoch before the broker learns about it
+      brokerEpochInRequest < controller.brokerEpoch
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index 23836dd..e7b43f5 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -94,15 +94,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
 
   @Test
   def testControlRequestWithCorrectBrokerEpoch(): Unit = {
-    testControlRequestWithBrokerEpoch(false)
+    testControlRequestWithBrokerEpoch(0)
   }
 
   @Test
   def testControlRequestWithStaleBrokerEpoch(): Unit = {
-    testControlRequestWithBrokerEpoch(true)
+    testControlRequestWithBrokerEpoch(-1)
   }
 
-  private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean): Unit = {
+  @Test
+  def testControlRequestWithNewerBrokerEpoch(): Unit = {
+    testControlRequestWithBrokerEpoch(1)
+  }
+
+  private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = {
     val tp = new TopicPartition("new-topic", 0)
 
     // create topic with 1 partition, 2 replicas, one on each broker
@@ -127,8 +132,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
     controllerChannelManager.startup()
 
     val broker2 = servers(brokerId2)
-    val epochInRequest =
-      if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch
+    val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch
 
     try {
       // Send LeaderAndIsr request with correct broker epoch
@@ -150,10 +154,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           epochInRequest,
           partitionStates.asJava, nodes.toSet.asJava)
 
-        if (isEpochInRequestStale) {
+        if (epochInRequestDiffFromCurrentEpoch < 0) {
+          // stale broker epoch in LEADER_AND_ISR
           sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         }
         else {
+          // broker epoch in LEADER_AND_ISR >= current broker epoch
           sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000)
         }
@@ -190,10 +196,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           epochInRequest,
           partitionStates.asJava, liveBrokers.asJava)
 
-        if (isEpochInRequestStale) {
+        if (epochInRequestDiffFromCurrentEpoch < 0) {
+          // stale broker epoch in UPDATE_METADATA
           sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         }
         else {
+          // broker epoch in UPDATE_METADATA >= current broker epoch
           sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000)
           assertEquals(brokerId2,
@@ -208,9 +216,11 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           epochInRequest, // Correct broker epoch
           true, Set(tp).asJava)
 
-        if (isEpochInRequestStale) {
+        if (epochInRequestDiffFromCurrentEpoch < 0) {
+          // stale broker epoch in STOP_REPLICA
           sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         } else {
+          // broker epoch in STOP_REPLICA >= current broker epoch
           sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
         }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 44632fb..fd8da07 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -21,8 +21,7 @@ import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util
 import java.util.Arrays.asList
-import java.util.Random
-import java.util.{Collections, Optional}
+import java.util.{Collections, Optional, Random}
 import java.util.concurrent.TimeUnit
 
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
@@ -40,7 +39,7 @@ import kafka.network.RequestChannel.SendResponse
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.memory.MemoryPool
@@ -294,7 +293,7 @@ class KafkaApisTest {
     val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
 
     val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
-    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit]  = EasyMock.newCapture()
+    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.getMagic(tp1))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
@@ -1196,7 +1195,7 @@ class KafkaApisTest {
     val leaderEpoch = 0
     val tp0 = new TopicPartition("tp", 0)
 
-    val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0,0, Int.MaxValue, Optional.of(leaderEpoch)))
+    val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0, 0, Int.MaxValue, Optional.of(leaderEpoch)))
     val fetchFromFollower = buildRequest(new FetchRequest.Builder(
       ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 0, fetchData
     ).build())
@@ -1283,6 +1282,164 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest)
+    val request = buildRequest(updateMetadataRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      Seq()
+    )
+
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleUpdateMetadataRequest(request)
+    val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse)
+      .asInstanceOf[UpdateMetadataResponse]
+    assertEquals(expectedError, updateMetadataResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val controllerId = 2
+    val controllerEpoch = 6
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+    val partitionStates = Seq(
+      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+        .setTopicName("topicW")
+        .setPartitionIndex(1)
+        .setControllerEpoch(1)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(asList(0, 1))
+        .setZkVersion(2)
+        .setReplicas(asList(0, 1, 2))
+        .setIsNew(false)
+    ).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+      ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      partitionStates,
+      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+    ).build()
+    val request = buildRequest(leaderAndIsrRequest)
+    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setPartitionErrors(asList()))
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      response
+    )
+
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleLeaderAndIsrRequest(request)
+    val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse)
+      .asInstanceOf[LeaderAndIsrResponse]
+    assertEquals(expectedError, leaderAndIsrResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+  }
+
+  @Test
+  def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val controllerId = 0
+    val controllerEpoch = 5
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+    val fooPartition = new TopicPartition("foo", 0)
+    val stopReplicaRequest = new StopReplicaRequest.Builder(
+      ApiKeys.STOP_REPLICA.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      false,
+      Seq(fooPartition).asJava
+    ).build()
+    val request = buildRequest(stopReplicaRequest)
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.stopReplicas(EasyMock.anyObject())).andStubReturn(
+      (mutable.Map(
+        fooPartition -> Errors.NONE
+      ), Errors.NONE)
+    )
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+    EasyMock.replay(controller, replicaManager, requestChannel)
+
+    createKafkaApis().handleStopReplicaRequest(request)
+    val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, stopReplicaRequest, capturedResponse)
+      .asInstanceOf[StopReplicaResponse]
+    assertEquals(expectedError, stopReplicaResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
    */
@@ -1410,7 +1567,7 @@ class KafkaApisTest {
     capturedResponse
   }
 
-  private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
+  private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest = {
     val replicas = List(0.asInstanceOf[Integer]).asJava
 
     def createPartitionState(partition: Int) = new UpdateMetadataPartitionState()
@@ -1433,8 +1590,12 @@ class KafkaApisTest {
         .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
         .setListener(plaintextListener.value)).asJava)
     val partitionStates = (0 until numPartitions).map(createPartitionState)
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
-      0, 0, partitionStates.asJava, Seq(broker).asJava).build()
+    new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+      0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava).build()
+  }
+
+  private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0)
     metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
   }
 }