You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/08/24 18:20:38 UTC
[kafka] branch trunk updated: KAFKA-10312;
Fix error code returned in Metadata response when leader is not
available (#9112)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f19cd6c KAFKA-10312; Fix error code returned in Metadata response when leader is not available (#9112)
f19cd6c is described below
commit f19cd6ca48ccd2971948c9589279bb8ee20e5d88
Author: Raman Verma <rv...@confluent.io>
AuthorDate: Mon Aug 24 11:18:59 2020 -0700
KAFKA-10312; Fix error code returned in Metadata response when leader is not available (#9112)
MetadataCache#getPartitionMetadata returns an error when the topic's leader Id
is present at MetadataCache but listener endpoint is not present for this leader.
For older versions, LEADER_NOT_AVAILABLE is returned while LISTENER_NOT_FOUND is
returned for new metadata versions.
The problem is that getPartitionMetadata was looking up MetadataCache's host brokerId rather
than the topic's leader id while determining what error to return. This
could result in the call returning LISTENER_NOT_FOUND when it should
have returned LEADER_NOT_AVAILABLE. This commit corrects this behavior.
Unit tests were already present to test out the error codes returned
under different situations but they were giving out a false positive.
The test was using same broker id for both the MetadataCache's host as
well as for the topic's leader. Error manifests when the MetadataCache's
host id is changed. Improved the test.
This commit also consolidated couple of related tests to reduce code
duplication.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/server/MetadataCache.scala | 2 +-
.../unit/kafka/server/MetadataCacheTest.scala | 49 +++++++---------------
2 files changed, 15 insertions(+), 36 deletions(-)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 79d84ea..8460446 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -97,7 +97,7 @@ class MetadataCache(brokerId: Int) extends Logging {
maybeLeader match {
case None =>
- val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already holding the read lock
+ val error = if (!snapshot.aliveBrokers.contains(leaderBrokerId)) { // we are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition: leader not available")
Errors.LEADER_NOT_AVAILABLE
} else {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d252936..1a62065 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -154,14 +154,18 @@ class MetadataCacheTest {
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)).asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+ val metadataCacheBrokerId = 0
+ // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata version.
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
}
@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
+ // when listener name is not present in the metadata cache for a broker, getTopicMetadata should
+ // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and new versions respectively.
val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
@@ -187,49 +191,24 @@ class MetadataCacheTest {
new UpdateMetadataBroker()
.setId(1)
.setEndpoints(broker1Endpoints.asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+ val metadataCacheBrokerId = 0
+ // leader available in cache but listener name not present. expect LISTENER_NOT_FOUND error for new metadata version
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
- }
-
- @Test
- def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
- val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
- val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
- val broker0Endpoints = Seq(
- new UpdateMetadataEndpoint()
- .setHost("host0")
- .setPort(9092)
- .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value),
- new UpdateMetadataEndpoint()
- .setHost("host0")
- .setPort(9093)
- .setSecurityProtocol(SecurityProtocol.SSL.id)
- .setListener(sslListenerName.value))
- val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
- .setHost("host1")
- .setPort(9092)
- .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value))
- val brokers = Seq(
- new UpdateMetadataBroker()
- .setId(0)
- .setEndpoints(broker0Endpoints.asJava),
- new UpdateMetadataBroker()
- .setId(1)
- .setEndpoints(broker1Endpoints.asJava))
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
+ // leader available in cache but listener name not present. expect LEADER_NOT_AVAILABLE error for old metadata version
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId, brokers, sslListenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
}
- private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Seq[UpdateMetadataBroker],
+ private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(metadataCacheBrokerId: Int,
+ brokers: Seq[UpdateMetadataBroker],
listenerName: ListenerName,
leader: Int,
expectedError: Errors,
errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"
- val cache = new MetadataCache(1)
+ val cache = new MetadataCache(metadataCacheBrokerId)
val zkVersion = 3
val controllerId = 2