You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/08 16:31:18 UTC

[kafka] branch trunk updated: KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6cff21  KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)
e6cff21 is described below

commit e6cff21fd8c5add0eb7e55417a91f0530a7d3a32
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed May 8 09:31:05 2019 -0700

    KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)
    
    Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/ManualMetadataUpdater.java       |  10 +-
 .../java/org/apache/kafka/clients/Metadata.java    |  45 +++------
 .../org/apache/kafka/clients/MetadataUpdater.java  |   8 +-
 .../org/apache/kafka/clients/NetworkClient.java    |  14 ++-
 .../admin/internals/AdminMetadataManager.java      |   3 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |  14 ++-
 .../kafka/clients/consumer/KafkaConsumer.java      |   3 +-
 .../consumer/internals/ConsumerMetadata.java       |  10 +-
 .../producer/internals/ProducerMetadata.java       |   7 +-
 .../apache/kafka/clients/NetworkClientTest.java    |  40 +++++++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../internals/AbstractCoordinatorTest.java         |   2 +-
 .../internals/ConsumerCoordinatorTest.java         |   4 +-
 .../consumer/internals/ConsumerMetadataTest.java   |   4 +-
 .../internals/ConsumerNetworkClientTest.java       |  15 ++-
 .../clients/consumer/internals/FetcherTest.java    |   2 +-
 .../internals/OffsetForLeaderEpochClientTest.java  |   2 +-
 .../kafka/api/ConsumerTopicCreationTest.scala      | 107 +++++++++++++++++++++
 18 files changed, 228 insertions(+), 66 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index ec007a6..7fb0224 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.slf4j.Logger;
@@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements MetadataUpdater {
     }
 
     @Override
-    public void handleAuthenticationFailure(AuthenticationException exception) {
-        // We don't fail the broker on authentication failures, but there is sufficient information in the broker logs
-        // to identify the failure.
-        log.debug("An authentication error occurred in broker-to-broker communication.", exception);
+    public void handleFatalException(KafkaException exception) {
+        // We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason
+        // for failure.
+        log.debug("An error occurred in broker-to-broker communication.", exception);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ef01b4b..ae75045 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -66,8 +65,8 @@ public class Metadata implements Closeable {
     private int requestVersion; // bumped on every new topic addition
     private long lastRefreshMs;
     private long lastSuccessfulRefreshMs;
-    private AuthenticationException authenticationException;
-    private KafkaException metadataException;
+    private KafkaException fatalException;
+    private KafkaException recoverableException;
     private MetadataCache cache = MetadataCache.empty();
     private boolean needUpdate;
     private final ClusterResourceListeners clusterResourceListeners;
@@ -202,25 +201,13 @@ public class Metadata implements Closeable {
     }
 
     /**
-     * If any non-retriable authentication exceptions were encountered during
-     * metadata update, clear and return the exception.
+     * If any non-retriable exceptions were encountered during metadata update, clear and return the exception.
      */
-    public synchronized AuthenticationException getAndClearAuthenticationException() {
-        if (authenticationException != null) {
-            AuthenticationException exception = authenticationException;
-            authenticationException = null;
-            return exception;
-        } else
-            return null;
-    }
-
-    synchronized KafkaException getAndClearMetadataException() {
-        if (this.metadataException != null) {
-            KafkaException metadataException = this.metadataException;
-            this.metadataException = null;
-            return metadataException;
-        } else
-            return null;
+    public synchronized KafkaException getAndClearMetadataException() {
+        KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException);
+        fatalException = null;
+        recoverableException = null;
+        return metadataException;
     }
 
     public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
@@ -281,7 +268,7 @@ public class Metadata implements Closeable {
 
     private void maybeSetMetadataError(Cluster cluster) {
         // if we encounter any invalid topics, cache the exception to later throw to the user
-        metadataException = null;
+        recoverableException = null;
         checkInvalidTopics(cluster);
         checkUnauthorizedTopics(cluster);
     }
@@ -289,14 +276,16 @@ public class Metadata implements Closeable {
     private void checkInvalidTopics(Cluster cluster) {
         if (!cluster.invalidTopics().isEmpty()) {
             log.error("Metadata response reported invalid topics {}", cluster.invalidTopics());
-            metadataException = new InvalidTopicException(cluster.invalidTopics());
+            // We may be able to recover from this exception if metadata for this topic is no longer needed
+            recoverableException = new InvalidTopicException(cluster.invalidTopics());
         }
     }
 
     private void checkUnauthorizedTopics(Cluster cluster) {
         if (!cluster.unauthorizedTopics().isEmpty()) {
             log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics());
-            metadataException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+            // We may be able to recover from this exception if metadata for this topic is no longer needed
+            recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
         }
     }
 
@@ -368,10 +357,6 @@ public class Metadata implements Closeable {
     }
 
     public synchronized void maybeThrowException() {
-        AuthenticationException authenticationException = getAndClearAuthenticationException();
-        if (authenticationException != null)
-            throw authenticationException;
-
         KafkaException metadataException = getAndClearMetadataException();
         if (metadataException != null)
             throw metadataException;
@@ -381,9 +366,9 @@ public class Metadata implements Closeable {
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
      */
-    public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
+    public synchronized void failedUpdate(long now, KafkaException fatalException) {
         this.lastRefreshMs = now;
-        this.authenticationException = authenticationException;
+        this.fatalException = fatalException;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index de765db..e2261d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 
@@ -64,11 +64,11 @@ public interface MetadataUpdater extends Closeable {
     void handleDisconnection(String destination);
 
     /**
-     * Handle authentication failure. Propagate the authentication exception if awaiting metadata.
+     * Handle failure. Propagate the exception if awaiting metadata.
      *
-     * @param exception authentication exception from broker
+     * @param fatalException exception corresponding to the failure
      */
-    void handleAuthenticationFailure(AuthenticationException exception);
+    void handleFatalException(KafkaException fatalException);
 
     /**
      * Handle responses for metadata requests.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4f69256..48693f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -433,8 +434,8 @@ public class NetworkClient implements KafkaClient {
         doSend(request, false, now);
     }
 
-    private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
-                                             String nodeConnectionId, long now) {
+    // package-private for testing
+    void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
         ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
         doSend(clientRequest, true, now);
     }
@@ -480,6 +481,9 @@ public class NetworkClient implements KafkaClient {
                     clientRequest.callback(), clientRequest.destination(), now, now,
                     false, unsupportedVersionException, null, null);
             abortedSends.add(clientResponse);
+
+            if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA)
+                metadataUpdater.handleFatalException(unsupportedVersionException);
         }
     }
 
@@ -715,7 +719,7 @@ public class NetworkClient implements KafkaClient {
             case AUTHENTICATION_FAILED:
                 AuthenticationException exception = disconnectState.exception();
                 connectionStates.authenticationFailed(nodeId, now, exception);
-                metadataUpdater.handleAuthenticationFailure(exception);
+                metadataUpdater.handleFatalException(exception);
                 log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
                     disconnectState.remoteAddress(), exception.getMessage());
                 break;
@@ -1005,9 +1009,9 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public void handleAuthenticationFailure(AuthenticationException exception) {
+        public void handleFatalException(KafkaException fatalException) {
             if (metadata.updateRequested())
-                metadata.failedUpdate(time.milliseconds(), exception);
+                metadata.failedUpdate(time.milliseconds(), fatalException);
             inProgressRequestVersion = null;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 3d9e5ca..b7080ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.clients.MetadataUpdater;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -104,7 +105,7 @@ public class AdminMetadataManager {
         }
 
         @Override
-        public void handleAuthenticationFailure(AuthenticationException e) {
+        public void handleFatalException(KafkaException e) {
             updateFailed(e);
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e285ade..c9b5004 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -261,6 +261,14 @@ public class ConsumerConfig extends AbstractConfig {
 
     public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
 
+    /** <code>allow.auto.create.topics</code> */
+    public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
+    private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" +
+            " subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" +
+            " broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
+            " be set to `false` when using brokers older than 0.11.0";
+    public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
+    
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -464,6 +472,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
                                         Importance.MEDIUM,
                                         ISOLATION_LEVEL_DOC)
+                                .define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
+                                        Type.BOOLEAN,
+                                        DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
+                                        Importance.MEDIUM,
+                                        ALLOW_AUTO_CREATE_TOPICS_DOC)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
@@ -472,7 +485,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
                                 .withClientSaslSupport();
-
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5c8c1dc..3bfd5ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,6 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata = new ConsumerMetadata(retryBackoffMs,
                     config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     !config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+                    config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
                     subscriptions, logContext, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
@@ -1830,7 +1831,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
             Timer timer = time.timer(timeout);
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
-                    new MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
+                    new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
             return topicMetadata.get(topic);
         } finally {
             release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index c87849d..fbdf1c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -28,21 +28,28 @@ import java.util.Set;
 
 public class ConsumerMetadata extends Metadata {
     private final boolean includeInternalTopics;
+    private final boolean allowAutoTopicCreation;
     private final SubscriptionState subscription;
     private final Set<String> transientTopics;
 
     public ConsumerMetadata(long refreshBackoffMs,
                             long metadataExpireMs,
                             boolean includeInternalTopics,
+                            boolean allowAutoTopicCreation,
                             SubscriptionState subscription,
                             LogContext logContext,
                             ClusterResourceListeners clusterResourceListeners) {
         super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
         this.includeInternalTopics = includeInternalTopics;
+        this.allowAutoTopicCreation = allowAutoTopicCreation;
         this.subscription = subscription;
         this.transientTopics = new HashSet<>();
     }
 
+    public boolean allowAutoTopicCreation() {
+        return allowAutoTopicCreation;
+    }
+
     @Override
     public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
         if (subscription.hasPatternSubscription())
@@ -50,7 +57,7 @@ public class ConsumerMetadata extends Metadata {
         List<String> topics = new ArrayList<>();
         topics.addAll(subscription.groupSubscription());
         topics.addAll(transientTopics);
-        return new MetadataRequest.Builder(topics, true);
+        return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
     }
 
     synchronized void addTransientTopics(Set<String> topics) {
@@ -73,5 +80,4 @@ public class ConsumerMetadata extends Metadata {
 
         return subscription.matchesSubscribedPattern(topic);
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
index 90e7970..295036b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -111,9 +110,9 @@ public class ProducerMetadata extends Metadata {
     }
 
     @Override
-    public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
-        super.failedUpdate(now, authenticationException);
-        if (authenticationException != null)
+    public synchronized void failedUpdate(long now, KafkaException fatalException) {
+        super.failedUpdate(now, fatalException);
+        if (fatalException != null)
             notifyAll();
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 4908eb3..a31c38a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
@@ -58,25 +60,26 @@ public class NetworkClientTest {
     protected final long reconnectBackoffMsTest = 10 * 1000;
     protected final long reconnectBackoffMaxMsTest = 10 * 10000;
 
+    private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node));
     private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
     private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
     private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
     private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
 
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
-        return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
+        return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
                 defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
-        return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)),
+        return new NetworkClient(selector, metadataUpdater,
                 "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
                 ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
-        return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
+        return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
                 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
                 ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
@@ -140,6 +143,16 @@ public class NetworkClientTest {
         assertFalse("Connection should not be ready after close", client.isReady(node, 0));
     }
 
+    @Test
+    public void testUnsupportedVersionDuringInternalMetadataRequest() {
+        List<String> topics = Arrays.asList("topic_1");
+
+        // disabling auto topic creation for versions less than 4 is not supported
+        MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3);
+        client.sendInternalMetadataRequest(builder, node.idString(), time.milliseconds());
+        assertEquals(UnsupportedVersionException.class, metadataUpdater.getAndClearFailure().getClass());
+    }
+
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
         ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
@@ -583,4 +596,25 @@ public class NetworkClientTest {
             this.response = response;
         }
     }
+
+    // ManualMetadataUpdater with ability to keep track of failures
+    private static class TestMetadataUpdater extends ManualMetadataUpdater {
+        KafkaException failure;
+
+        public TestMetadataUpdater(List<Node> nodes) {
+            super(nodes);
+        }
+
+        @Override
+        public void handleFatalException(KafkaException exception) {
+            failure = exception;
+            super.handleFatalException(exception);
+        }
+
+        public KafkaException getAndClearFailure() {
+            KafkaException failure = this.failure;
+            this.failure = null;
+            return failure;
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 524ee25..3a4076b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1656,8 +1656,8 @@ public class KafkaConsumerTest {
     }
 
     private ConsumerMetadata createMetadata(SubscriptionState subscription) {
-        return new ConsumerMetadata(0, Long.MAX_VALUE, false, subscription,
-                new LogContext(), new ClusterResourceListeners());
+        return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+                                    subscription, new LogContext(), new ClusterResourceListeners());
     }
 
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 93c074b..4ce0386 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -96,7 +96,7 @@ public class AbstractCoordinatorTest {
         LogContext logContext = new LogContext();
         this.mockTime = new MockTime();
         ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L,
-                false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
+                false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
                 logContext, new ClusterResourceListeners());
 
         this.mockClient = new MockClient(mockTime, metadata);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a83df5e..4f6e0f1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest {
         LogContext logContext = new LogContext();
         this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
         this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
-                subscriptions, logContext, new ClusterResourceListeners());
+                false, subscriptions, logContext, new ClusterResourceListeners());
         this.client = new MockClient(time, metadata);
         this.client.updateMetadata(metadataResponse);
         this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100,
@@ -1181,7 +1181,7 @@ public class ConsumerCoordinatorTest {
 
     private void testInternalTopicInclusion(boolean includeInternalTopics) {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
-                subscriptions, new LogContext(), new ClusterResourceListeners());
+                false, subscriptions, new LogContext(), new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index d97887a..86740f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -159,8 +159,8 @@ public class ConsumerMetadataTest {
     private ConsumerMetadata newConsumerMetadata(boolean includeInternalTopics) {
         long refreshBackoffMs = 50;
         long expireMs = 50000;
-        return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, subscription, new LogContext(),
-                new ClusterResourceListeners());
+        return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, false,
+                subscription, new LogContext(), new ClusterResourceListeners());
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 14c2cba..1b7f8fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -238,7 +239,7 @@ public class ConsumerNetworkClientTest {
             fail("Expected authentication error thrown");
         } catch (AuthenticationException e) {
             // After the exception is raised, it should have been cleared
-            assertNull(metadata.getAndClearAuthenticationException());
+            assertNull(metadata.getAndClearMetadataException());
         }
     }
 
@@ -259,6 +260,18 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void testMetadataFailurePropagated() {
+        KafkaException metadataException = new KafkaException();
+        metadata.failedUpdate(time.milliseconds(), metadataException);
+        try {
+            consumerClient.poll(time.timer(Duration.ZERO));
+            fail("Expected poll to throw exception");
+        } catch (Exception e) {
+            assertEquals(metadataException, e);
+        }
+    }
+
+    @Test
     public void testFutureCompletionOutsidePoll() throws Exception {
         // Tests the scenario in which the request that is being awaited in one thread
         // is received and completed in another thread.
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6a0a4f3..30c9a04 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3395,7 +3395,7 @@ public class FetcherTest {
         LogContext logContext = new LogContext();
         time = new MockTime(1);
         subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
-        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         metrics = new Metrics(metricConfig, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index ee00e48..55b8754 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -157,7 +157,7 @@ public class OffsetForLeaderEpochClientTest {
         LogContext logContext = new LogContext();
         time = new MockTime(1);
         subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
-        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time,
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
new file mode 100644
index 0000000..11fbefd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.lang.{Boolean => JBoolean}
+import java.time.Duration
+import java.util
+
+import scala.collection.JavaConverters._
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.{After, Test}
+
+/**
+ * Tests behavior of specifying auto topic creation configuration for the consumer and broker
+ */
+@RunWith(value = classOf[Parameterized])
+class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
+  override protected def brokerCount: Int = 1
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val producerClientId = "ConsumerTestProducer"
+  val consumerClientId = "ConsumerTestConsumer"
+  var adminClient: AdminClient = null
+
+  // configure server properties
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, brokerAutoTopicCreationEnable.toString)
+
+  // configure client properties
+  this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+  this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString)
+
+  @After
+  override def tearDown(): Unit = {
+    if (adminClient != null)
+      Utils.closeQuietly(adminClient, "AdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testAutoTopicCreation(): Unit = {
+    val consumer = createConsumer()
+    adminClient = AdminClient.create(createConfig())
+
+    consumer.subscribe(util.Arrays.asList(topic))
+    consumer.poll(Duration.ofMillis(100))
+
+    val topicCreated = adminClient.listTopics.names.get.contains(topic)
+    if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics)
+      assert(topicCreated == true)
+    else
+      assert(topicCreated == false)
+  }
+
+  def createConfig(): util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+}
+
+object ConsumerTopicCreationTest {
+  @Parameters(name = "brokerTopicCreation={0}, consumerTopicCreation={1}")
+  def parameters: java.util.Collection[Array[Object]] = {
+    val data = new java.util.ArrayList[Array[Object]]()
+    for (brokerAutoTopicCreationEnable <- Array(JBoolean.TRUE, JBoolean.FALSE))
+      for (consumerAutoCreateTopicsPolicy <- Array(JBoolean.TRUE, JBoolean.FALSE))
+        data.add(Array(brokerAutoTopicCreationEnable, consumerAutoCreateTopicsPolicy))
+    data
+  }
+}