You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/20 16:21:03 UTC
[kafka] branch 2.0 updated: KAFKA-6546: Use LISTENER_NOT_FOUND
error for missing listener (#5189)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new a123bc0 KAFKA-6546: Use LISTENER_NOT_FOUND error for missing listener (#5189)
a123bc0 is described below
commit a123bc021bfd39b745028a5346b07afbd66bcce5
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Jun 20 17:08:58 2018 +0100
KAFKA-6546: Use LISTENER_NOT_FOUND error for missing listener (#5189)
For metadata request version 6 and above, use a different error code to indicate missing listener on leader broker to enable diagnosis of listener configuration issues.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../org/apache/kafka/clients/NetworkClient.java | 16 +++++++
.../common/errors/ListenerNotFoundException.java | 38 ++++++++++++++++
.../org/apache/kafka/common/protocol/Errors.java | 10 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 11 +++--
.../main/scala/kafka/server/MetadataCache.scala | 23 +++++++---
.../server/DynamicBrokerReconfigurationTest.scala | 39 ++++++++++++-----
.../unit/kafka/server/MetadataCacheTest.scala | 50 +++++++++++++++++++---
7 files changed, 159 insertions(+), 28 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 619f7bd..ea16ac9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
@@ -52,6 +53,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Collectors;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@@ -945,6 +947,20 @@ public class NetworkClient implements KafkaClient {
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
Cluster cluster = response.cluster();
+
+ // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
+ // issues. This could be a transient issue if listeners were added dynamically to brokers.
+ List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
+ topicMetadata.partitionMetadata().stream()
+ .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
+ .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
+ .collect(Collectors.toList());
+ if (!missingListenerPartitions.isEmpty()) {
+ int count = missingListenerPartitions.size();
+ log.warn("{} partitions have leader brokers without a matching listener, including {}",
+ count, missingListenerPartitions.subList(0, Math.min(10, count)));
+ }
+
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
new file mode 100644
index 0000000..82c5d89
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The leader does not have an endpoint corresponding to the listener on which metadata was requested.
+ * This could indicate a broker configuration error or a transient error when listeners are updated
+ * dynamically and client requests are processed before all brokers have updated their listeners.
+ * This is currently used only for missing listeners on leader brokers, but may be used for followers
+ * in future.
+ */
+public class ListenerNotFoundException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ListenerNotFoundException(String message) {
+ super(message);
+ }
+
+ public ListenerNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5db1d31..9c522df 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenDisabledException;
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
+import org.apache.kafka.common.errors.ListenerNotFoundException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -624,7 +625,14 @@ public enum Errors {
public ApiException build(String message) {
return new InvalidFetchSessionEpochException(message);
}
- });
+ }),
+ LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that matches the listener on which metadata request was processed",
+ new ApiExceptionBuilder() {
+ @Override
+ public ApiException build(String message) {
+ return new ListenerNotFoundException(message);
+ }
+ }),;
private interface ApiExceptionBuilder {
ApiException build(String message);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ebdf141..0c88be9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -991,8 +991,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
- errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
- val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
+ errorUnavailableEndpoints: Boolean,
+ errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+ val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
+ errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
@@ -1068,12 +1070,15 @@ class KafkaApis(val requestChannel: RequestChannel,
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
+ // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
+ // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
+ val errorUnavailableListeners = requestVersion >= 6
val topicMetadata =
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
- errorUnavailableEndpoints)
+ errorUnavailableEndpoints, errorUnavailableListeners)
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 43fe352..b0603b8 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging {
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
- private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+ // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+ // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+ private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+ errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
- val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName)
+ val leaderBrokerId = partitionState.basePartitionState.leader
+ val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
- debug(s"Error while fetching metadata for $topicPartition: leader not available")
- new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
+ val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock
+ debug(s"Error while fetching metadata for $topicPartition: leader not available")
+ Errors.LEADER_NOT_AVAILABLE
+ } else {
+ debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId")
+ if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+ }
+ new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(),
replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
case Some(leader) =>
@@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
- def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
+ def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false,
+ errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
inReadLock(partitionMetadataLock) {
topics.toSeq.flatMap { topic =>
- getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata =>
+ getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
}
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index cb57bbf..96cca23 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -817,10 +817,23 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
//verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
+
+ // Verify that a listener added to a subset of servers doesn't cause any issues
+ // when metadata is processed by the client.
+ addListener(servers.tail, "SCRAM_LISTENER", SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256"))
+ val bootstrap = TestUtils.bootstrapServers(servers.tail, new ListenerName("SCRAM_LISTENER"))
+ val producer = ProducerBuilder().bootstrapServers(bootstrap)
+ .securityProtocol(SecurityProtocol.SASL_PLAINTEXT)
+ .saslMechanism("SCRAM-SHA-256")
+ .maxRetries(1000)
+ .build()
+ val partitions = producer.partitionsFor(topic).asScala
+ assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == servers.head.config.brokerId))
+ assertTrue("Did not find partitions with no leader", partitions.exists(_.leader == null))
}
- private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
- saslMechanisms: Seq[String]): Unit = {
+ private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config
val existingListenerCount = config.listeners.size
val listeners = config.listeners
@@ -860,14 +873,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}), "Listener not created")
- if (saslMechanisms.nonEmpty)
- saslMechanisms.foreach { mechanism =>
- verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
- }
- else
- verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
-
- val brokerConfigs = describeConfig(adminClients.head).entries.asScala
+ val brokerConfigs = describeConfig(adminClients.head, servers).entries.asScala
props.asScala.foreach { case (name, value) =>
val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name"))
if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
@@ -877,6 +883,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
+ private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
+ saslMechanisms: Seq[String]): Unit = {
+ addListener(servers, listenerName, securityProtocol, saslMechanisms)
+ if (saslMechanisms.nonEmpty)
+ saslMechanisms.foreach { mechanism =>
+ verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
+ }
+ else
+ verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
+ }
+
private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
@@ -1006,7 +1023,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
- private def describeConfig(adminClient: AdminClient): Config = {
+ private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 0ee7365..82c14ee 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -110,7 +110,47 @@ class MetadataCacheTest {
}
@Test
- def getTopicMetadataPartitionLeaderNotAvailable() {
+ def getTopicMetadataPartitionLeaderNotAvailable(): Unit = {
+ val securityProtocol = SecurityProtocol.PLAINTEXT
+ val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+ val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+ leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+ leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
+ }
+
+ @Test
+ def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+ val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+ val broker0Endpoints = Seq(
+ new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
+ new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+ val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
+ val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+ leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
+ }
+
+ @Test
+ def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
+ val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
+ val broker0Endpoints = Seq(
+ new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
+ new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
+ val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
+ val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+ leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
+ }
+
+ private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker],
+ listenerName: ListenerName,
+ leader: Int,
+ expectedError: Errors,
+ errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"
val cache = new MetadataCache(1)
@@ -118,11 +158,7 @@ class MetadataCacheTest {
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
- val securityProtocol = SecurityProtocol.PLAINTEXT
- val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
- val leader = 1
val leaderEpoch = 1
val partitionStates = Map(
new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList()))
@@ -132,7 +168,7 @@ class MetadataCacheTest {
partitionStates.asJava, brokers.asJava).build()
cache.updateCache(15, updateMetadataRequest)
- val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
+ val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
@@ -143,7 +179,7 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
- assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error)
+ assertEquals(expectedError, partitionMetadata.error)
assertTrue(partitionMetadata.isr.isEmpty)
assertEquals(1, partitionMetadata.replicas.size)
assertEquals(0, partitionMetadata.replicas.get(0).id)