You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/22 21:15:11 UTC

[GitHub] [kafka] apovzner commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

apovzner commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r413337972



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
     assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest)
+    val request = buildRequest(updateMetadataRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      Seq()
+    )
+
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleUpdateMetadataRequest(request)
+    val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse)
+      .asInstanceOf[UpdateMetadataResponse]
+    assertEquals(expectedError, updateMetadataResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+    val controllerId = 2
+    val controllerEpoch = 6
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+    val partitionStates = Seq(
+      new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+        .setTopicName("topicW")
+        .setPartitionIndex(1)
+        .setControllerEpoch(1)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(asList(0, 1))
+        .setZkVersion(2)
+        .setReplicas(asList(0, 1, 2))
+        .setIsNew(false)
+    ).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+      ApiKeys.LEADER_AND_ISR.latestVersion,
+      controllerId,
+      controllerEpoch,
+      brokerEpochInRequest,
+      partitionStates,
+      asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+    ).build()
+    val request = buildRequest(leaderAndIsrRequest)
+    val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setPartitionErrors(asList()))
+
+    EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+    EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+      EasyMock.eq(request.context.correlationId),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()
+    )).andStubReturn(
+      response
+    )
+
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, controller, requestChannel)
+
+    createKafkaApis().handleLeaderAndIsrRequest(request)
+    val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse)
+      .asInstanceOf[LeaderAndIsrResponse]
+    assertEquals(expectedError, leaderAndIsrResponse.error())
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+    val currentBrokerEpoch = 1239875L
+    testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)

Review comment:
       thanks for catching this and others below!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org