You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/08/24 18:20:38 UTC

[kafka] branch trunk updated: KAFKA-10312; Fix error code returned in Metadata response when leader is not available (#9112)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f19cd6c  KAFKA-10312; Fix error code returned in Metadata response when leader is not available (#9112)
f19cd6c is described below

commit f19cd6ca48ccd2971948c9589279bb8ee20e5d88
Author: Raman Verma <rv...@confluent.io>
AuthorDate: Mon Aug 24 11:18:59 2020 -0700

    KAFKA-10312; Fix error code returned in Metadata response when leader is not available (#9112)
    
    MetadataCache#getPartitionMetadata returns an error when the topic's leader Id
    is present at MetadataCache but listener endpoint is not present for this leader.
    For older versions, LEADER_NOT_AVAILABLE is returned while LISTENER_NOT_FOUND is
    returned for new metadata versions.
    
    The problem is that getPartitionMetadata was looking up MetadataCache's host brokerId rather
    than the topic's leader id while determining what error to return. This
    could result in the call returning LISTENER_NOT_FOUND when it should
    have returned LEADER_NOT_AVAILABLE. This commit corrects this behavior.
    
    Unit tests were already present to test out the error codes returned
    under different situations but they were giving out a false positive.
    The test was using same broker id for both the MetadataCache's host as
    well as for the topic's leader. Error manifests when the MetadataCache's
    host id is changed. Improved the test.
    
    This commit also consolidated couple of related tests to reduce code
    duplication.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/MetadataCache.scala    |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala      | 49 +++++++---------------
 2 files changed, 15 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 79d84ea..8460446 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -97,7 +97,7 @@ class MetadataCache(brokerId: Int) extends Logging {
 
         maybeLeader match {
           case None =>
-            val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
+            val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) { // we are already holding the read lock
               debug(s"Error while fetching metadata for $topicPartition: leader not available")
               Errors.LEADER_NOT_AVAILABLE
             } else {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d252936..1a62065 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -154,14 +154,18 @@ class MetadataCacheTest {
         .setPort(9092)
         .setSecurityProtocol(securityProtocol.id)
         .setListener(listenerName.value)).asJava))
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+    val metadataCacheBrokerId = 0
+    // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata version.
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
   }
 
   @Test
   def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+    // when listener name is not present in the metadata cache for a broker, getTopicMetadata should
+    // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and new versions respectively.
     val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
     val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
     val broker0Endpoints = Seq(
@@ -187,49 +191,24 @@ class MetadataCacheTest {
       new UpdateMetadataBroker()
         .setId(1)
         .setEndpoints(broker1Endpoints.asJava))
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+    val metadataCacheBrokerId = 0
+    // leader available in cache but listener name not present. expect LISTENER_NOT_FOUND error for new metadata version
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
       leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
-  }
-
-  @Test
-  def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
-    val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-    val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
-    val broker0Endpoints = Seq(
-      new UpdateMetadataEndpoint()
-        .setHost("host0")
-        .setPort(9092)
-        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-        .setListener(plaintextListenerName.value),
-      new UpdateMetadataEndpoint()
-        .setHost("host0")
-        .setPort(9093)
-        .setSecurityProtocol(SecurityProtocol.SSL.id)
-        .setListener(sslListenerName.value))
-    val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
-      .setHost("host1")
-      .setPort(9092)
-      .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-      .setListener(plaintextListenerName.value))
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setEndpoints(broker0Endpoints.asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(broker1Endpoints.asJava))
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+    // leader available in cache but listener name not present. expect LEADER_NOT_AVAILABLE error for old metadata version
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
   }
 
-  private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Seq[UpdateMetadataBroker],
+  private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId: Int,
+                                                                       brokers: Seq[UpdateMetadataBroker],
                                                                        listenerName: ListenerName,
                                                                        leader: Int,
                                                                        expectedError: Errors,
                                                                        errorUnavailableListeners: Boolean): Unit = {
     val topic = "topic"
 
-    val cache = new MetadataCache(1)
+    val cache = new MetadataCache(metadataCacheBrokerId)
 
     val zkVersion = 3
     val controllerId = 2