You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/08 17:29:47 UTC

[kafka] branch hachikuji/KAFKA-9212 created (now cfb88f4)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch hachikuji/KAFKA-9212
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at cfb88f4  Fix test and minor tweaks

This branch includes the following new commits:

     new cfb88f4  Fix test and minor tweaks

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: Fix test and minor tweaks

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch hachikuji/KAFKA-9212
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cfb88f4f116e0e2932c5dd2e416841824a915257
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Dec 8 09:27:59 2019 -0800

    Fix test and minor tweaks
---
 clients/src/test/java/org/apache/kafka/clients/MetadataTest.java | 7 ++++++-
 core/src/main/scala/kafka/controller/KafkaController.scala       | 8 ++++----
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index fc51957..7067e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -158,6 +158,11 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now + 1));
     }
 
+    /**
+     * Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does not propagate leader epoch
+     * information accurately while a reassignment is in progress, so we cannot rely on it. This is explained in more
+     * detail in MetadataResponse's constructor.
+     */
     @Test
     public void testIgnoreLeaderEpochInOlderMetadataResponse() {
         TopicPartition tp = new TopicPartition("topic", 0);
@@ -196,7 +201,7 @@ public class MetadataTest {
             assertEquals(-1, info.epoch());
         }
 
-        for (short version = 9; version <= ApiKeys.METADATA.oldestVersion(); version++) {
+        for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
             Struct struct = data.toStruct(version);
             MetadataResponse response = new MetadataResponse(struct, version);
             assertTrue(response.hasReliableLeaderEpochs());
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6133cc6..444e74d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1081,15 +1081,15 @@ class KafkaController(val config: KafkaConfig,
           val UpdateLeaderAndIsrResult(finishedUpdates, _) =
             zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
 
-          finishedUpdates.get(partition).exists {
-            case Right(leaderAndIsr) =>
+          finishedUpdates.get(partition) match {
+            case Some(Right(leaderAndIsr)) =>
               val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
               controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
               finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
               info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
               true
-            case Left(e) =>
-              throw e
+            case Some(Left(e)) => throw e
+            case None => false
           }
         case None =>
           throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +