You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/13 15:36:42 UTC
[kafka] branch 2.2 updated: MINOR: Use request version to ensure
metadata reflects subscription changes (#6718)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 220af14 MINOR: Use request version to ensure metadata reflects subscription changes (#6718)
220af14 is described below
commit 220af14ac30a34ccbe8563e73fb5c01b4f642e5e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 13 08:36:21 2019 -0700
MINOR: Use request version to ensure metadata reflects subscription changes (#6718)
This is a backport to 2.2 of a fix which was merged in KAFKA-7831. When there is a subscription change with a metadata request in flight, we may incorrectly interpret the response as applying to the changed subscription. The fix here is to track a separate request version in `Metadata` so that we do not confuse the topic set that was requested.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../java/org/apache/kafka/clients/Metadata.java | 62 +++++++++++++++++-----
.../org/apache/kafka/clients/NetworkClient.java | 36 ++++++-------
.../consumer/internals/ConsumerNetworkClient.java | 4 +-
.../kafka/common/requests/MetadataRequest.java | 2 +
.../org/apache/kafka/clients/MetadataTest.java | 31 +++++++++++
5 files changed, 102 insertions(+), 33 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 4d1efdf..b59e0bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,8 @@ public class Metadata implements Closeable {
private final long refreshBackoffMs;
private final long metadataExpireMs;
- private int version;
+ private int updateVersion; // bumped on every metadata response
+ private int requestVersion; // bumped on every new topic addition
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
@@ -109,7 +111,8 @@ public class Metadata implements Closeable {
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
- this.version = 0;
+ this.requestVersion = 0;
+ this.updateVersion = 0;
this.needUpdate = false;
this.topics = new HashMap<>();
this.listeners = new ArrayList<>();
@@ -165,7 +168,7 @@ public class Metadata implements Closeable {
*/
public synchronized int requestUpdate() {
this.needUpdate = true;
- return this.version;
+ return this.updateVersion;
}
/**
@@ -253,7 +256,7 @@ public class Metadata implements Closeable {
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
- while ((this.version <= lastVersion) && !isClosed()) {
+ while ((this.updateVersion <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
throw ex;
@@ -311,26 +314,37 @@ public class Metadata implements Closeable {
this.needUpdate = true;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
- this.version += 1;
+ this.updateVersion += 1;
this.cache = MetadataCache.bootstrap(addresses);
}
/**
+ * Update metadata assuming the current request version. This is mainly for convenience in testing.
+ */
+ public synchronized void update(MetadataResponse response, long now) {
+ this.update(this.requestVersion, response, now);
+ }
+
+ /**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param metadataResponse metadata response received from the broker
* @param now current time in milliseconds
*/
- public synchronized void update(MetadataResponse metadataResponse, long now) {
+ public synchronized void update(int requestVersion, MetadataResponse metadataResponse, long now) {
Objects.requireNonNull(metadataResponse, "Metadata response cannot be null");
if (isClosed())
throw new IllegalStateException("Update requested after metadata close");
- this.needUpdate = false;
+ if (requestVersion == this.requestVersion)
+ this.needUpdate = false;
+ else
+ requestUpdate();
+
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
- this.version += 1;
+ this.updateVersion += 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
@@ -367,7 +381,7 @@ public class Metadata implements Closeable {
clusterResourceListeners.onUpdate(clusterForListeners.clusterResource());
notifyAll();
- log.debug("Updated cluster metadata version {} to {}", this.version, this.cache);
+ log.debug("Updated cluster metadata version {} to {}", this.updateVersion, this.cache);
}
/**
@@ -450,10 +464,10 @@ public class Metadata implements Closeable {
}
/**
- * @return The current metadata version
+ * @return The current metadata update version
*/
- public synchronized int version() {
- return this.version;
+ public synchronized int updateVersion() {
+ return this.updateVersion;
}
/**
@@ -532,10 +546,32 @@ public class Metadata implements Closeable {
void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
}
- private synchronized void requestUpdateForNewTopics() {
+ // Visible for testing
+ synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
+ this.requestVersion++;
requestUpdate();
}
+ public synchronized MetadataRequestAndVersion newMetadataRequestAndVersion() {
+ final MetadataRequest.Builder metadataRequestBuilder;
+ if (needMetadataForAllTopics)
+ metadataRequestBuilder = MetadataRequest.Builder.allTopics();
+ else
+ metadataRequestBuilder = new MetadataRequest.Builder(new ArrayList<>(this.topics.keySet()),
+ allowAutoTopicCreation());
+ return new MetadataRequestAndVersion(metadataRequestBuilder, requestVersion);
+ }
+
+ public static class MetadataRequestAndVersion {
+ public final MetadataRequest.Builder requestBuilder;
+ public final int requestVersion;
+
+ private MetadataRequestAndVersion(MetadataRequest.Builder requestBuilder, int requestVersion) {
+ this.requestBuilder = requestBuilder;
+ this.requestVersion = requestVersion;
+ }
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 44446b3..707845e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -937,12 +937,13 @@ public class NetworkClient implements KafkaClient {
/* the current cluster metadata */
private final Metadata metadata;
- /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
- private boolean metadataFetchInProgress;
+ // Defined if there is a request in progress, null otherwise
+ private Integer inProgressRequestVersion;
+
DefaultMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
- this.metadataFetchInProgress = false;
+ this.inProgressRequestVersion = null;
}
@Override
@@ -952,14 +953,18 @@ public class NetworkClient implements KafkaClient {
@Override
public boolean isUpdateDue(long now) {
- return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
+ return !this.hasFetchInProgress() && this.metadata.timeToNextUpdate(now) == 0;
+ }
+
+ private boolean hasFetchInProgress() {
+ return inProgressRequestVersion != null;
}
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
+ long waitForMetadataFetch = this.hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
@@ -992,19 +997,20 @@ public class NetworkClient implements KafkaClient {
log.warn("Bootstrap broker {} disconnected", node);
}
- metadataFetchInProgress = false;
+ inProgressRequestVersion = null;
}
@Override
public void handleAuthenticationFailure(AuthenticationException exception) {
- metadataFetchInProgress = false;
if (metadata.updateRequested())
metadata.failedUpdate(time.milliseconds(), exception);
+ inProgressRequestVersion = null;
}
@Override
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
- this.metadataFetchInProgress = false;
+ int requestVersion = inProgressRequestVersion;
+ inProgressRequestVersion = null;
// If any partition has leader with missing listeners, log a few for diagnosing broker configuration
// issues. This could be a transient issue if listeners were added dynamically to brokers.
@@ -1030,7 +1036,7 @@ public class NetworkClient implements KafkaClient {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now, null);
} else {
- this.metadata.update(response, now);
+ this.metadata.update(requestVersion, response, now);
}
}
@@ -1063,15 +1069,9 @@ public class NetworkClient implements KafkaClient {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) {
- this.metadataFetchInProgress = true;
- MetadataRequest.Builder metadataRequest;
- if (metadata.needMetadataForAllTopics())
- metadataRequest = MetadataRequest.Builder.allTopics();
- else
- metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
- metadata.allowAutoTopicCreation());
-
-
+ Metadata.MetadataRequestAndVersion metadataRequestAndVersion = metadata.newMetadataRequestAndVersion();
+ inProgressRequestVersion = metadataRequestAndVersion.requestVersion;
+ MetadataRequest.Builder metadataRequest = metadataRequestAndVersion.requestBuilder;
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return defaultRequestTimeoutMs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 924b3ef..94af3d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -162,8 +162,8 @@ public class ConsumerNetworkClient implements Closeable {
AuthenticationException ex = this.metadata.getAndClearAuthenticationException();
if (ex != null)
throw ex;
- } while (this.metadata.version() == version && timer.notExpired());
- return this.metadata.version() > version;
+ } while (this.metadata.updateVersion() == version && timer.notExpired());
+ return this.metadata.updateVersion() > version;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 89a6e69..b27495f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -123,6 +123,8 @@ public class MetadataRequest extends AbstractRequest {
} else {
bld.append(Utils.join(topics, ","));
}
+ bld.append(", allowAutoCreate=");
+ bld.append(allowAutoTopicCreation);
bld.append(")");
return bld.toString();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5ce1417..b9fffde 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockClusterResourceListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -637,6 +639,35 @@ public class MetadataTest {
assertEquals(fromMetadataEmpty, fromClusterEmpty);
}
+ @Test
+ public void testRequestVersion() {
+ Time time = new MockTime();
+
+ metadata.requestUpdate();
+ Metadata.MetadataRequestAndVersion versionAndBuilder = metadata.newMetadataRequestAndVersion();
+ metadata.update(versionAndBuilder.requestVersion,
+ TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+ assertFalse(metadata.updateRequested());
+
+ // bump the request version for new topics added to the metadata
+ metadata.requestUpdateForNewTopics();
+
+ // simulating a bump while a metadata request is in flight
+ versionAndBuilder = metadata.newMetadataRequestAndVersion();
+ metadata.requestUpdateForNewTopics();
+ metadata.update(versionAndBuilder.requestVersion,
+ TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+
+ // metadata update is still needed
+ assertTrue(metadata.updateRequested());
+
+ // the next update will resolve it
+ versionAndBuilder = metadata.newMetadataRequestAndVersion();
+ metadata.update(versionAndBuilder.requestVersion,
+ TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+ assertFalse(metadata.updateRequested());
+ }
+
private void clearBackgroundError() {
backgroundError.set(null);
}