You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/13 15:36:42 UTC

[kafka] branch 2.2 updated: MINOR: Use request version to ensure metadata reflects subscription changes (#6718)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 220af14  MINOR: Use request version to ensure metadata reflects subscription changes (#6718)
220af14 is described below

commit 220af14ac30a34ccbe8563e73fb5c01b4f642e5e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 13 08:36:21 2019 -0700

    MINOR: Use request version to ensure metadata reflects subscription changes (#6718)
    
    This is a backport to 2.2 of a fix which was merged in KAFKA-7831. When there is a subscription change with a metadata request in flight, we may incorrectly interpret the response as applying to the changed subscription. The fix here is to track a separate request version in `Metadata` so that we do not confuse the topic set that was requested.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 62 +++++++++++++++++-----
 .../org/apache/kafka/clients/NetworkClient.java    | 36 ++++++-------
 .../consumer/internals/ConsumerNetworkClient.java  |  4 +-
 .../kafka/common/requests/MetadataRequest.java     |  2 +
 .../org/apache/kafka/clients/MetadataTest.java     | 31 +++++++++++
 5 files changed, 102 insertions(+), 33 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 4d1efdf..b59e0bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,8 @@ public class Metadata implements Closeable {
 
     private final long refreshBackoffMs;
     private final long metadataExpireMs;
-    private int version;
+    private int updateVersion;  // bumped on every metadata response
+    private int requestVersion; // bumped on every new topic addition
     private long lastRefreshMs;
     private long lastSuccessfulRefreshMs;
     private AuthenticationException authenticationException;
@@ -109,7 +111,8 @@ public class Metadata implements Closeable {
         this.topicExpiryEnabled = topicExpiryEnabled;
         this.lastRefreshMs = 0L;
         this.lastSuccessfulRefreshMs = 0L;
-        this.version = 0;
+        this.requestVersion = 0;
+        this.updateVersion = 0;
         this.needUpdate = false;
         this.topics = new HashMap<>();
         this.listeners = new ArrayList<>();
@@ -165,7 +168,7 @@ public class Metadata implements Closeable {
      */
     public synchronized int requestUpdate() {
         this.needUpdate = true;
-        return this.version;
+        return this.updateVersion;
     }
 
     /**
@@ -253,7 +256,7 @@ public class Metadata implements Closeable {
 
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while ((this.version <= lastVersion) && !isClosed()) {
+        while ((this.updateVersion <= lastVersion) && !isClosed()) {
             AuthenticationException ex = getAndClearAuthenticationException();
             if (ex != null)
                 throw ex;
@@ -311,26 +314,37 @@ public class Metadata implements Closeable {
         this.needUpdate = true;
         this.lastRefreshMs = now;
         this.lastSuccessfulRefreshMs = now;
-        this.version += 1;
+        this.updateVersion += 1;
         this.cache = MetadataCache.bootstrap(addresses);
     }
 
     /**
+     * Update metadata assuming the current request version. This is mainly for convenience in testing.
+     */
+    public synchronized void update(MetadataResponse response, long now) {
+        this.update(this.requestVersion, response, now);
+    }
+
+    /**
      * Updates the cluster metadata. If topic expiry is enabled, expiry time
      * is set for topics if required and expired topics are removed from the metadata.
      *
      * @param metadataResponse metadata response received from the broker
      * @param now current time in milliseconds
      */
-    public synchronized void update(MetadataResponse metadataResponse, long now) {
+    public synchronized void update(int requestVersion, MetadataResponse metadataResponse, long now) {
         Objects.requireNonNull(metadataResponse, "Metadata response cannot be null");
         if (isClosed())
             throw new IllegalStateException("Update requested after metadata close");
 
-        this.needUpdate = false;
+        if (requestVersion == this.requestVersion)
+            this.needUpdate = false;
+        else
+            requestUpdate();
+
         this.lastRefreshMs = now;
         this.lastSuccessfulRefreshMs = now;
-        this.version += 1;
+        this.updateVersion += 1;
 
         if (topicExpiryEnabled) {
             // Handle expiry of topics from the metadata refresh set.
@@ -367,7 +381,7 @@ public class Metadata implements Closeable {
         clusterResourceListeners.onUpdate(clusterForListeners.clusterResource());
 
         notifyAll();
-        log.debug("Updated cluster metadata version {} to {}", this.version, this.cache);
+        log.debug("Updated cluster metadata version {} to {}", this.updateVersion, this.cache);
     }
 
     /**
@@ -450,10 +464,10 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * @return The current metadata version
+     * @return The current metadata update version
      */
-    public synchronized int version() {
-        return this.version;
+    public synchronized int updateVersion() {
+        return this.updateVersion;
     }
 
     /**
@@ -532,10 +546,32 @@ public class Metadata implements Closeable {
         void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
     }
 
-    private synchronized void requestUpdateForNewTopics() {
+    // Visible for testing
+    synchronized void requestUpdateForNewTopics() {
         // Override the timestamp of last refresh to let immediate update.
         this.lastRefreshMs = 0;
+        this.requestVersion++;
         requestUpdate();
     }
 
+    public synchronized MetadataRequestAndVersion newMetadataRequestAndVersion() {
+        final MetadataRequest.Builder metadataRequestBuilder;
+        if (needMetadataForAllTopics)
+            metadataRequestBuilder = MetadataRequest.Builder.allTopics();
+        else
+            metadataRequestBuilder = new MetadataRequest.Builder(new ArrayList<>(this.topics.keySet()),
+                    allowAutoTopicCreation());
+        return new MetadataRequestAndVersion(metadataRequestBuilder, requestVersion);
+    }
+
+    public static class MetadataRequestAndVersion {
+        public final MetadataRequest.Builder requestBuilder;
+        public final int requestVersion;
+
+        private MetadataRequestAndVersion(MetadataRequest.Builder requestBuilder, int requestVersion) {
+            this.requestBuilder = requestBuilder;
+            this.requestVersion = requestVersion;
+        }
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 44446b3..707845e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -937,12 +937,13 @@ public class NetworkClient implements KafkaClient {
         /* the current cluster metadata */
         private final Metadata metadata;
 
-        /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
-        private boolean metadataFetchInProgress;
+        // Defined if there is a request in progress, null otherwise
+        private Integer inProgressRequestVersion;
+
 
         DefaultMetadataUpdater(Metadata metadata) {
             this.metadata = metadata;
-            this.metadataFetchInProgress = false;
+            this.inProgressRequestVersion = null;
         }
 
         @Override
@@ -952,14 +953,18 @@ public class NetworkClient implements KafkaClient {
 
         @Override
         public boolean isUpdateDue(long now) {
-            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
+            return !this.hasFetchInProgress() && this.metadata.timeToNextUpdate(now) == 0;
+        }
+
+        private boolean hasFetchInProgress() {
+            return inProgressRequestVersion != null;
         }
 
         @Override
         public long maybeUpdate(long now) {
             // should we update our metadata?
             long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-            long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
+            long waitForMetadataFetch = this.hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
 
             long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
 
@@ -992,19 +997,20 @@ public class NetworkClient implements KafkaClient {
                     log.warn("Bootstrap broker {} disconnected", node);
             }
 
-            metadataFetchInProgress = false;
+            inProgressRequestVersion = null;
         }
 
         @Override
         public void handleAuthenticationFailure(AuthenticationException exception) {
-            metadataFetchInProgress = false;
             if (metadata.updateRequested())
                 metadata.failedUpdate(time.milliseconds(), exception);
+            inProgressRequestVersion = null;
         }
 
         @Override
         public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
-            this.metadataFetchInProgress = false;
+            int requestVersion = inProgressRequestVersion;
+            inProgressRequestVersion = null;
 
             // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
             // issues. This could be a transient issue if listeners were added dynamically to brokers.
@@ -1030,7 +1036,7 @@ public class NetworkClient implements KafkaClient {
                 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now, null);
             } else {
-                this.metadata.update(response, now);
+                this.metadata.update(requestVersion, response, now);
             }
         }
 
@@ -1063,15 +1069,9 @@ public class NetworkClient implements KafkaClient {
             String nodeConnectionId = node.idString();
 
             if (canSendRequest(nodeConnectionId, now)) {
-                this.metadataFetchInProgress = true;
-                MetadataRequest.Builder metadataRequest;
-                if (metadata.needMetadataForAllTopics())
-                    metadataRequest = MetadataRequest.Builder.allTopics();
-                else
-                    metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
-                            metadata.allowAutoTopicCreation());
-
-
+                Metadata.MetadataRequestAndVersion metadataRequestAndVersion = metadata.newMetadataRequestAndVersion();
+                inProgressRequestVersion = metadataRequestAndVersion.requestVersion;
+                MetadataRequest.Builder metadataRequest = metadataRequestAndVersion.requestBuilder;
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                 return defaultRequestTimeoutMs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 924b3ef..94af3d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -162,8 +162,8 @@ public class ConsumerNetworkClient implements Closeable {
             AuthenticationException ex = this.metadata.getAndClearAuthenticationException();
             if (ex != null)
                 throw ex;
-        } while (this.metadata.version() == version && timer.notExpired());
-        return this.metadata.version() > version;
+        } while (this.metadata.updateVersion() == version && timer.notExpired());
+        return this.metadata.updateVersion() > version;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 89a6e69..b27495f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -123,6 +123,8 @@ public class MetadataRequest extends AbstractRequest {
             } else {
                 bld.append(Utils.join(topics, ","));
             }
+            bld.append(", allowAutoCreate=");
+            bld.append(allowAutoTopicCreation);
             bld.append(")");
             return bld.toString();
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5ce1417..b9fffde 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockClusterResourceListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -637,6 +639,35 @@ public class MetadataTest {
         assertEquals(fromMetadataEmpty, fromClusterEmpty);
     }
 
+    @Test
+    public void testRequestVersion() {
+        Time time = new MockTime();
+
+        metadata.requestUpdate();
+        Metadata.MetadataRequestAndVersion versionAndBuilder = metadata.newMetadataRequestAndVersion();
+        metadata.update(versionAndBuilder.requestVersion,
+                TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+        assertFalse(metadata.updateRequested());
+
+        // bump the request version for new topics added to the metadata
+        metadata.requestUpdateForNewTopics();
+
+        // simulating a bump while a metadata request is in flight
+        versionAndBuilder = metadata.newMetadataRequestAndVersion();
+        metadata.requestUpdateForNewTopics();
+        metadata.update(versionAndBuilder.requestVersion,
+                TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+
+        // metadata update is still needed
+        assertTrue(metadata.updateRequested());
+
+        // the next update will resolve it
+        versionAndBuilder = metadata.newMetadataRequestAndVersion();
+        metadata.update(versionAndBuilder.requestVersion,
+                TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)), time.milliseconds());
+        assertFalse(metadata.updateRequested());
+    }
+
     private void clearBackgroundError() {
         backgroundError.set(null);
     }