You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/08 16:31:18 UTC
[kafka] branch trunk updated: KAFKA-7320;
Add consumer configuration to disable auto topic creation [KIP-361]
(#5542)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6cff21 KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)
e6cff21 is described below
commit e6cff21fd8c5add0eb7e55417a91f0530a7d3a32
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed May 8 09:31:05 2019 -0700
KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542)
Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/ManualMetadataUpdater.java | 10 +-
.../java/org/apache/kafka/clients/Metadata.java | 45 +++------
.../org/apache/kafka/clients/MetadataUpdater.java | 8 +-
.../org/apache/kafka/clients/NetworkClient.java | 14 ++-
.../admin/internals/AdminMetadataManager.java | 3 +-
.../kafka/clients/consumer/ConsumerConfig.java | 14 ++-
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../consumer/internals/ConsumerMetadata.java | 10 +-
.../producer/internals/ProducerMetadata.java | 7 +-
.../apache/kafka/clients/NetworkClientTest.java | 40 +++++++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 4 +-
.../internals/AbstractCoordinatorTest.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 4 +-
.../consumer/internals/ConsumerMetadataTest.java | 4 +-
.../internals/ConsumerNetworkClientTest.java | 15 ++-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../internals/OffsetForLeaderEpochClientTest.java | 2 +-
.../kafka/api/ConsumerTopicCreationTest.scala | 107 +++++++++++++++++++++
18 files changed, 228 insertions(+), 66 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index ec007a6..7fb0224 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.slf4j.Logger;
@@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements MetadataUpdater {
}
@Override
- public void handleAuthenticationFailure(AuthenticationException exception) {
- // We don't fail the broker on authentication failures, but there is sufficient information in the broker logs
- // to identify the failure.
- log.debug("An authentication error occurred in broker-to-broker communication.", exception);
+ public void handleFatalException(KafkaException exception) {
+ // We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason
+ // for failure.
+ log.debug("An error occurred in broker-to-broker communication.", exception);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ef01b4b..ae75045 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -66,8 +65,8 @@ public class Metadata implements Closeable {
private int requestVersion; // bumped on every new topic addition
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
- private AuthenticationException authenticationException;
- private KafkaException metadataException;
+ private KafkaException fatalException;
+ private KafkaException recoverableException;
private MetadataCache cache = MetadataCache.empty();
private boolean needUpdate;
private final ClusterResourceListeners clusterResourceListeners;
@@ -202,25 +201,13 @@ public class Metadata implements Closeable {
}
/**
- * If any non-retriable authentication exceptions were encountered during
- * metadata update, clear and return the exception.
+ * If any non-retriable exceptions were encountered during metadata update, clear and return the exception.
*/
- public synchronized AuthenticationException getAndClearAuthenticationException() {
- if (authenticationException != null) {
- AuthenticationException exception = authenticationException;
- authenticationException = null;
- return exception;
- } else
- return null;
- }
-
- synchronized KafkaException getAndClearMetadataException() {
- if (this.metadataException != null) {
- KafkaException metadataException = this.metadataException;
- this.metadataException = null;
- return metadataException;
- } else
- return null;
+ public synchronized KafkaException getAndClearMetadataException() {
+ KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException);
+ fatalException = null;
+ recoverableException = null;
+ return metadataException;
}
public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
@@ -281,7 +268,7 @@ public class Metadata implements Closeable {
private void maybeSetMetadataError(Cluster cluster) {
// if we encounter any invalid topics, cache the exception to later throw to the user
- metadataException = null;
+ recoverableException = null;
checkInvalidTopics(cluster);
checkUnauthorizedTopics(cluster);
}
@@ -289,14 +276,16 @@ public class Metadata implements Closeable {
private void checkInvalidTopics(Cluster cluster) {
if (!cluster.invalidTopics().isEmpty()) {
log.error("Metadata response reported invalid topics {}", cluster.invalidTopics());
- metadataException = new InvalidTopicException(cluster.invalidTopics());
+ // We may be able to recover from this exception if metadata for this topic is no longer needed
+ recoverableException = new InvalidTopicException(cluster.invalidTopics());
}
}
private void checkUnauthorizedTopics(Cluster cluster) {
if (!cluster.unauthorizedTopics().isEmpty()) {
log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics());
- metadataException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+ // We may be able to recover from this exception if metadata for this topic is no longer needed
+ recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
}
}
@@ -368,10 +357,6 @@ public class Metadata implements Closeable {
}
public synchronized void maybeThrowException() {
- AuthenticationException authenticationException = getAndClearAuthenticationException();
- if (authenticationException != null)
- throw authenticationException;
-
KafkaException metadataException = getAndClearMetadataException();
if (metadataException != null)
throw metadataException;
@@ -381,9 +366,9 @@ public class Metadata implements Closeable {
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
- public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
+ public synchronized void failedUpdate(long now, KafkaException fatalException) {
this.lastRefreshMs = now;
- this.authenticationException = authenticationException;
+ this.fatalException = fatalException;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index de765db..e2261d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
@@ -64,11 +64,11 @@ public interface MetadataUpdater extends Closeable {
void handleDisconnection(String destination);
/**
- * Handle authentication failure. Propagate the authentication exception if awaiting metadata.
+ * Handle failure. Propagate the exception if awaiting metadata.
*
- * @param exception authentication exception from broker
+ * @param fatalException exception corresponding to the failure
*/
- void handleAuthenticationFailure(AuthenticationException exception);
+ void handleFatalException(KafkaException fatalException);
/**
* Handle responses for metadata requests.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4f69256..48693f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -433,8 +434,8 @@ public class NetworkClient implements KafkaClient {
doSend(request, false, now);
}
- private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
- String nodeConnectionId, long now) {
+ // package-private for testing
+ void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
@@ -480,6 +481,9 @@ public class NetworkClient implements KafkaClient {
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
abortedSends.add(clientResponse);
+
+ if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA)
+ metadataUpdater.handleFatalException(unsupportedVersionException);
}
}
@@ -715,7 +719,7 @@ public class NetworkClient implements KafkaClient {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
- metadataUpdater.handleAuthenticationFailure(exception);
+ metadataUpdater.handleFatalException(exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
@@ -1005,9 +1009,9 @@ public class NetworkClient implements KafkaClient {
}
@Override
- public void handleAuthenticationFailure(AuthenticationException exception) {
+ public void handleFatalException(KafkaException fatalException) {
if (metadata.updateRequested())
- metadata.failedUpdate(time.milliseconds(), exception);
+ metadata.failedUpdate(time.milliseconds(), fatalException);
inProgressRequestVersion = null;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 3d9e5ca..b7080ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -104,7 +105,7 @@ public class AdminMetadataManager {
}
@Override
- public void handleAuthenticationFailure(AuthenticationException e) {
+ public void handleFatalException(KafkaException e) {
updateFailed(e);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e285ade..c9b5004 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -261,6 +261,14 @@ public class ConsumerConfig extends AbstractConfig {
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
+ /** <code>allow.auto.create.topics</code> */
+ public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
+ private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" +
+ " subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" +
+ " broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
+ " be set to `false` when using brokers older than 0.11.0";
+ public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -464,6 +472,11 @@ public class ConsumerConfig extends AbstractConfig {
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
+ .define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,
+ Type.BOOLEAN,
+ DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
+ Importance.MEDIUM,
+ ALLOW_AUTO_CREATE_TOPICS_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
@@ -472,7 +485,6 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
-
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5c8c1dc..3bfd5ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,6 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = new ConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+ config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
@@ -1830,7 +1831,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Timer timer = time.timer(timeout);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
- new MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
+ new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
return topicMetadata.get(topic);
} finally {
release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index c87849d..fbdf1c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -28,21 +28,28 @@ import java.util.Set;
public class ConsumerMetadata extends Metadata {
private final boolean includeInternalTopics;
+ private final boolean allowAutoTopicCreation;
private final SubscriptionState subscription;
private final Set<String> transientTopics;
public ConsumerMetadata(long refreshBackoffMs,
long metadataExpireMs,
boolean includeInternalTopics,
+ boolean allowAutoTopicCreation,
SubscriptionState subscription,
LogContext logContext,
ClusterResourceListeners clusterResourceListeners) {
super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
this.includeInternalTopics = includeInternalTopics;
+ this.allowAutoTopicCreation = allowAutoTopicCreation;
this.subscription = subscription;
this.transientTopics = new HashSet<>();
}
+ public boolean allowAutoTopicCreation() {
+ return allowAutoTopicCreation;
+ }
+
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
if (subscription.hasPatternSubscription())
@@ -50,7 +57,7 @@ public class ConsumerMetadata extends Metadata {
List<String> topics = new ArrayList<>();
topics.addAll(subscription.groupSubscription());
topics.addAll(transientTopics);
- return new MetadataRequest.Builder(topics, true);
+ return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
synchronized void addTransientTopics(Set<String> topics) {
@@ -73,5 +80,4 @@ public class ConsumerMetadata extends Metadata {
return subscription.matchesSubscribedPattern(topic);
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
index 90e7970..295036b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -111,9 +110,9 @@ public class ProducerMetadata extends Metadata {
}
@Override
- public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
- super.failedUpdate(now, authenticationException);
- if (authenticationException != null)
+ public synchronized void failedUpdate(long now, KafkaException fatalException) {
+ super.failedUpdate(now, fatalException);
+ if (fatalException != null)
notifyAll();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 4908eb3..a31c38a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
@@ -58,25 +60,26 @@ public class NetworkClientTest {
protected final long reconnectBackoffMsTest = 10 * 1000;
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
+ private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node));
private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
- return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
+ return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithStaticNodes() {
- return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)),
+ return new NetworkClient(selector, metadataUpdater,
"mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
- return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE,
+ return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
@@ -140,6 +143,16 @@ public class NetworkClientTest {
assertFalse("Connection should not be ready after close", client.isReady(node, 0));
}
+ @Test
+ public void testUnsupportedVersionDuringInternalMetadataRequest() {
+ List<String> topics = Arrays.asList("topic_1");
+
+ // disabling auto topic creation for versions less than 4 is not supported
+ MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3);
+ client.sendInternalMetadataRequest(builder, node.idString(), time.milliseconds());
+ assertEquals(UnsupportedVersionException.class, metadataUpdater.getAndClearFailure().getClass());
+ }
+
private void checkSimpleRequestResponse(NetworkClient networkClient) {
awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
@@ -583,4 +596,25 @@ public class NetworkClientTest {
this.response = response;
}
}
+
+ // ManualMetadataUpdater with ability to keep track of failures
+ private static class TestMetadataUpdater extends ManualMetadataUpdater {
+ KafkaException failure;
+
+ public TestMetadataUpdater(List<Node> nodes) {
+ super(nodes);
+ }
+
+ @Override
+ public void handleFatalException(KafkaException exception) {
+ failure = exception;
+ super.handleFatalException(exception);
+ }
+
+ public KafkaException getAndClearFailure() {
+ KafkaException failure = this.failure;
+ this.failure = null;
+ return failure;
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 524ee25..3a4076b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1656,8 +1656,8 @@ public class KafkaConsumerTest {
}
private ConsumerMetadata createMetadata(SubscriptionState subscription) {
- return new ConsumerMetadata(0, Long.MAX_VALUE, false, subscription,
- new LogContext(), new ClusterResourceListeners());
+ return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+ subscription, new LogContext(), new ClusterResourceListeners());
}
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 93c074b..4ce0386 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -96,7 +96,7 @@ public class AbstractCoordinatorTest {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L,
- false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
+ false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST),
logContext, new ClusterResourceListeners());
this.mockClient = new MockClient(mockTime, metadata);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a83df5e..4f6e0f1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest {
LogContext logContext = new LogContext();
this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
- subscriptions, logContext, new ClusterResourceListeners());
+ false, subscriptions, logContext, new ClusterResourceListeners());
this.client = new MockClient(time, metadata);
this.client.updateMetadata(metadataResponse);
this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100,
@@ -1181,7 +1181,7 @@ public class ConsumerCoordinatorTest {
private void testInternalTopicInclusion(boolean includeInternalTopics) {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
- subscriptions, new LogContext(), new ClusterResourceListeners());
+ false, subscriptions, new LogContext(), new ClusterResourceListeners());
client = new MockClient(time, metadata);
coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index d97887a..86740f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -159,8 +159,8 @@ public class ConsumerMetadataTest {
private ConsumerMetadata newConsumerMetadata(boolean includeInternalTopics) {
long refreshBackoffMs = 50;
long expireMs = 50000;
- return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, subscription, new LogContext(),
- new ClusterResourceListeners());
+ return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, false,
+ subscription, new LogContext(), new ClusterResourceListeners());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 14c2cba..1b7f8fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
@@ -238,7 +239,7 @@ public class ConsumerNetworkClientTest {
fail("Expected authentication error thrown");
} catch (AuthenticationException e) {
// After the exception is raised, it should have been cleared
- assertNull(metadata.getAndClearAuthenticationException());
+ assertNull(metadata.getAndClearMetadataException());
}
}
@@ -259,6 +260,18 @@ public class ConsumerNetworkClientTest {
}
@Test
+ public void testMetadataFailurePropagated() {
+ KafkaException metadataException = new KafkaException();
+ metadata.failedUpdate(time.milliseconds(), metadataException);
+ try {
+ consumerClient.poll(time.timer(Duration.ZERO));
+ fail("Expected poll to throw exception");
+ } catch (Exception e) {
+ assertEquals(metadataException, e);
+ }
+ }
+
+ @Test
public void testFutureCompletionOutsidePoll() throws Exception {
// Tests the scenario in which the request that is being awaited in one thread
// is received and completed in another thread.
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6a0a4f3..30c9a04 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3395,7 +3395,7 @@ public class FetcherTest {
LogContext logContext = new LogContext();
time = new MockTime(1);
subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
- metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+ metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
subscriptions, logContext, new ClusterResourceListeners());
client = new MockClient(time, metadata);
metrics = new Metrics(metricConfig, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
index ee00e48..55b8754 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
@@ -157,7 +157,7 @@ public class OffsetForLeaderEpochClientTest {
LogContext logContext = new LogContext();
time = new MockTime(1);
subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
- metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false,
+ metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
subscriptions, logContext, new ClusterResourceListeners());
client = new MockClient(time, metadata);
consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time,
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
new file mode 100644
index 0000000..11fbefd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import java.lang.{Boolean => JBoolean}
+import java.time.Duration
+import java.util
+
+import scala.collection.JavaConverters._
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.{After, Test}
+
+/**
+ * Tests behavior of specifying auto topic creation configuration for the consumer and broker
+ */
+@RunWith(value = classOf[Parameterized])
+class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
+ override protected def brokerCount: Int = 1
+
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+ val producerClientId = "ConsumerTestProducer"
+ val consumerClientId = "ConsumerTestConsumer"
+ var adminClient: AdminClient = null
+
+ // configure server properties
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+ this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, brokerAutoTopicCreationEnable.toString)
+
+ // configure client properties
+ this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+ this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+ this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString)
+
+ @After
+ override def tearDown(): Unit = {
+ if (adminClient != null)
+ Utils.closeQuietly(adminClient, "AdminClient")
+ super.tearDown()
+ }
+
+ @Test
+ def testAutoTopicCreation(): Unit = {
+ val consumer = createConsumer()
+ adminClient = AdminClient.create(createConfig())
+
+ consumer.subscribe(util.Arrays.asList(topic))
+ consumer.poll(Duration.ofMillis(100))
+
+ val topicCreated = adminClient.listTopics.names.get.contains(topic)
+ if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics)
+ assert(topicCreated == true)
+ else
+ assert(topicCreated == false)
+ }
+
+ def createConfig(): util.Map[String, Object] = {
+ val config = new util.HashMap[String, Object]
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
+ val securityProps: util.Map[Object, Object] =
+ TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+ securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+ config
+ }
+}
+
+object ConsumerTopicCreationTest {
+ @Parameters(name = "brokerTopicCreation={0}, consumerTopicCreation={1}")
+ def parameters: java.util.Collection[Array[Object]] = {
+ val data = new java.util.ArrayList[Array[Object]]()
+ for (brokerAutoTopicCreationEnable <- Array(JBoolean.TRUE, JBoolean.FALSE))
+ for (consumerAutoCreateTopicsPolicy <- Array(JBoolean.TRUE, JBoolean.FALSE))
+ data.add(Array(brokerAutoTopicCreationEnable, consumerAutoCreateTopicsPolicy))
+ data
+ }
+}