You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/20 16:21:03 UTC

[kafka] branch 2.0 updated: KAFKA-6546: Use LISTENER_NOT_FOUND error for missing listener (#5189)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a123bc0  KAFKA-6546: Use LISTENER_NOT_FOUND error for missing listener (#5189)
a123bc0 is described below

commit a123bc021bfd39b745028a5346b07afbd66bcce5
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Jun 20 17:08:58 2018 +0100

    KAFKA-6546: Use LISTENER_NOT_FOUND error for missing listener (#5189)
    
    For metadata request version 6 and above, use a different error code to indicate missing listener on leader broker to enable diagnosis of listener configuration issues.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../org/apache/kafka/clients/NetworkClient.java    | 16 +++++++
 .../common/errors/ListenerNotFoundException.java   | 38 ++++++++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   | 10 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 11 +++--
 .../main/scala/kafka/server/MetadataCache.scala    | 23 +++++++---
 .../server/DynamicBrokerReconfigurationTest.scala  | 39 ++++++++++++-----
 .../unit/kafka/server/MetadataCacheTest.scala      | 50 +++++++++++++++++++---
 7 files changed, 159 insertions(+), 28 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 619f7bd..ea16ac9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
@@ -52,6 +53,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 /**
  * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -945,6 +947,20 @@ public class NetworkClient implements KafkaClient {
         public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
             Cluster cluster = response.cluster();
+
+            // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
+            // issues. This could be a transient issue if listeners were added dynamically to brokers.
+            List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
+                topicMetadata.partitionMetadata().stream()
+                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
+                    .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
+                .collect(Collectors.toList());
+            if (!missingListenerPartitions.isEmpty()) {
+                int count = missingListenerPartitions.size();
+                log.warn("{} partitions have leader brokers without a matching listener, including {}",
+                        count, missingListenerPartitions.subList(0, Math.min(10, count)));
+            }
+
             // check if any topics metadata failed to get updated
             Map<String, Errors> errors = response.errors();
             if (!errors.isEmpty())
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
new file mode 100644
index 0000000..82c5d89
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The leader does not have an endpoint corresponding to the listener on which metadata was requested.
+ * This could indicate a broker configuration error or a transient error when listeners are updated
+ * dynamically and client requests are processed before all brokers have updated their listeners.
+ * This is currently used only for missing listeners on leader brokers, but may be used for followers
+ * in future.
+ */
+public class ListenerNotFoundException extends InvalidMetadataException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ListenerNotFoundException(String message) {
+        super(message);
+    }
+
+    public ListenerNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5db1d31..9c522df 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenDisabledException;
 import org.apache.kafka.common.errors.DelegationTokenExpiredException;
 import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
 import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
+import org.apache.kafka.common.errors.ListenerNotFoundException;
 import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -624,7 +625,14 @@ public enum Errors {
             public ApiException build(String message) {
                 return new InvalidFetchSessionEpochException(message);
             }
-    });
+    }),
+    LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that matches the listener on which metadata request was processed",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ListenerNotFoundException(message);
+            }
+    }),;
 
     private interface ApiExceptionBuilder {
         ApiException build(String message);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ebdf141..0c88be9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -991,8 +991,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
-                               errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
-    val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
+                               errorUnavailableEndpoints: Boolean,
+                               errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+    val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
+        errorUnavailableEndpoints, errorUnavailableListeners)
     if (topics.isEmpty || topicResponses.size == topics.size) {
       topicResponses
     } else {
@@ -1068,12 +1070,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     // In version 0, we returned an error when brokers with replicas were unavailable,
     // while in higher versions we simply don't include the broker in the returned broker list
     val errorUnavailableEndpoints = requestVersion == 0
+    // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
+    // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
+    val errorUnavailableListeners = requestVersion >= 6
     val topicMetadata =
       if (authorizedTopics.isEmpty)
         Seq.empty[MetadataResponse.TopicMetadata]
       else
         getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
-          errorUnavailableEndpoints)
+          errorUnavailableEndpoints, errorUnavailableListeners)
 
     val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
 
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 43fe352..b0603b8 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
     cache.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
         val topicPartition = TopicAndPartition(topic, partitionId)
-        val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName)
+        val leaderBrokerId = partitionState.basePartitionState.leader
+        val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
         val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
         val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
         val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
-            debug(s"Error while fetching metadata for $topicPartition: leader not available")
-            new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
+            val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock
+              debug(s"Error while fetching metadata for $topicPartition: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+            new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(),
               replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
 
           case Some(leader) =>
@@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
+  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false,
+                       errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
     inReadLock(partitionMetadataLock) {
       topics.toSeq.flatMap { topic =>
-        getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata =>
+        getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
           new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
         }
       }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index cb57bbf..96cca23 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -817,10 +817,23 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
     //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
     verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+
+    // Verify that a listener added to a subset of servers doesn't cause any issues
+    // when metadata is processed by the client.
+    addListener(servers.tail, "SCRAM_LISTENER", SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256"))
+    val bootstrap = TestUtils.bootstrapServers(servers.tail, new ListenerName("SCRAM_LISTENER"))
+    val producer = ProducerBuilder().bootstrapServers(bootstrap)
+      .securityProtocol(SecurityProtocol.SASL_PLAINTEXT)
+      .saslMechanism("SCRAM-SHA-256")
+      .maxRetries(1000)
+      .build()
+    val partitions = producer.partitionsFor(topic).asScala
+    assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == servers.head.config.brokerId))
+    assertTrue("Did not find partitions with no leader", partitions.exists(_.leader == null))
   }
 
-  private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
-                                saslMechanisms: Seq[String]): Unit = {
+  private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+                          saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config
     val existingListenerCount = config.listeners.size
     val listeners = config.listeners
@@ -860,14 +873,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       }
     }), "Listener not created")
 
-    if (saslMechanisms.nonEmpty)
-      saslMechanisms.foreach { mechanism =>
-        verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
-      }
-    else
-      verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
-
-    val brokerConfigs = describeConfig(adminClients.head).entries.asScala
+    val brokerConfigs = describeConfig(adminClients.head, servers).entries.asScala
     props.asScala.foreach { case (name, value) =>
       val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name"))
       if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
@@ -877,6 +883,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
   }
 
+  private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
+                                saslMechanisms: Seq[String]): Unit = {
+    addListener(servers, listenerName, securityProtocol, saslMechanisms)
+    if (saslMechanisms.nonEmpty)
+      saslMechanisms.foreach { mechanism =>
+        verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
+      }
+    else
+      verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
+  }
+
   private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
                                    saslMechanisms: Seq[String]): Unit = {
     val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
@@ -1006,7 +1023,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
   }
 
-  private def describeConfig(adminClient: AdminClient): Config = {
+  private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
     }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 0ee7365..82c14ee 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -110,7 +110,47 @@ class MetadataCacheTest {
   }
 
   @Test
-  def getTopicMetadataPartitionLeaderNotAvailable() {
+  def getTopicMetadataPartitionLeaderNotAvailable(): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
+  }
+
+  @Test
+  def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+    val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+    val broker0Endpoints = Seq(
+      new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
+      new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+    val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
+    val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+      leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
+  }
+
+  @Test
+  def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
+    val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+    val broker0Endpoints = Seq(
+      new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
+      new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+    val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
+    val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+      leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
+  }
+
+  private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker],
+                                                                       listenerName: ListenerName,
+                                                                       leader: Int,
+                                                                       expectedError: Errors,
+                                                                       errorUnavailableListeners: Boolean): Unit = {
     val topic = "topic"
 
     val cache = new MetadataCache(1)
@@ -118,11 +158,7 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
 
-    val leader = 1
     val leaderEpoch = 1
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList()))
@@ -132,7 +168,7 @@ class MetadataCacheTest {
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -143,7 +179,7 @@ class MetadataCacheTest {
 
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error)
+    assertEquals(expectedError, partitionMetadata.error)
     assertTrue(partitionMetadata.isr.isEmpty)
     assertEquals(1, partitionMetadata.replicas.size)
     assertEquals(0, partitionMetadata.replicas.get(0).id)