You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2018/07/17 22:45:52 UTC
[kafka] branch trunk updated: KAFKA-5098;
KafkaProducer should reject sends to invalid topics
This is an automated email from the ASF dual-hosted git repository.
jjkoshy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c47a3e KAFKA-5098; KafkaProducer should reject sends to invalid topics
8c47a3e is described below
commit 8c47a3e52f3d6871a708b1d2c8b5a7e30a2a5b99
Author: Ahmed Al Mehdi <aa...@aalmehdi-ld1.linkedin.biz>
AuthorDate: Tue Jul 17 15:45:29 2018 -0700
KAFKA-5098; KafkaProducer should reject sends to invalid topics
…egal char and generates InvalidTopicException
If config parameter max.block.ms config parameter is set to a non-zero value,
KafkaProducer.send() blocks for the max.block.ms time if topic name has illegal
char or is invalid.
Wrote a unit test that verifies the appropriate exception is returned when
performing a get on the returned future by KafkaProducer.send().
Author: Ahmed Al Mehdi <aa...@aalmehdi-ld1.linkedin.biz>
Reviewers: Ismael Juma <is...@juma.me.uk>, Joel Koshy <jj...@gmail.com>, Manikumar Reddy O <ma...@gmail.com>
Closes #5247 from ahmedha/KAFKA-5098
---
.../java/org/apache/kafka/clients/Metadata.java | 6 ++-
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../consumer/internals/ConsumerCoordinator.java | 5 +++
.../kafka/clients/producer/KafkaProducer.java | 10 ++++-
.../main/java/org/apache/kafka/common/Cluster.java | 34 +++++++++++++---
.../kafka/common/errors/InvalidTopicException.java | 19 ++++++++-
.../kafka/common/requests/MetadataResponse.java | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 44 ++++++++-------------
.../kafka/clients/consumer/KafkaConsumerTest.java | 38 +++++++++++++++++-
.../kafka/clients/producer/KafkaProducerTest.java | 45 ++++++++++++++++++++++
.../test/java/org/apache/kafka/test/TestUtils.java | 15 ++++++++
11 files changed, 182 insertions(+), 40 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b1da9de..91b1587 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -353,6 +353,7 @@ public final class Metadata {
private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
+ Set<String> invalidTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet();
@@ -364,6 +365,9 @@ public final class Metadata {
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics.keySet());
+ invalidTopics.addAll(cluster.invalidTopics());
+ invalidTopics.addAll(this.cluster.invalidTopics());
+
for (String topic : this.topics.keySet()) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
@@ -373,6 +377,6 @@ public final class Metadata {
nodes = cluster.nodes();
controller = cluster.controller();
}
- return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
+ return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f722408..fb37fee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1148,6 +1148,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
* partitions to consume from
* @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
+ * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
+ * topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
*/
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
@@ -1595,7 +1597,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
- * This method may issue a remote call to the server if there is no current position
+ * This method may issue a remote call to the server if there is no current position
* for the given partition.
* <p>
* This call will block until the position can be determined, an unrecoverable error is
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 060e404..ea6d472 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -203,6 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+ // if we encounter any invalid topics, raise an exception to the user
+ if (!cluster.invalidTopics().isEmpty())
+ throw new InvalidTopicException(cluster.invalidTopics());
+
if (subscriptions.hasPatternSubscription())
updatePatternSubscription(cluster);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5af5b6..3a6717b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
@@ -898,8 +899,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
- metadata.add(topic);
Cluster cluster = metadata.fetch();
+
+ if (cluster.invalidTopics().contains(topic))
+ throw new InvalidTopicException(topic);
+
+ metadata.add(topic);
+
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
@@ -930,6 +936,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
+ if (cluster.invalidTopics().contains(topic))
+ throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ccbaa30..33d3749 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -36,6 +36,7 @@ public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
+ private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -55,7 +56,7 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, null);
}
/**
@@ -69,7 +70,22 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, controller);
+ }
+
+ /**
+ * Create a new cluster with the given id, nodes and partitions
+ * @param nodes The nodes in the cluster
+ * @param partitions Information about a subset of the topic-partitions this cluster hosts
+ */
+ public Cluster(String clusterId,
+ Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics,
+ Set<String> invalidTopics,
+ Set<String> internalTopics,
+ Node controller) {
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
}
private Cluster(String clusterId,
@@ -77,6 +93,7 @@ public final class Cluster {
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
+ Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
this.isBootstrapConfigured = isBootstrapConfigured;
@@ -131,6 +148,7 @@ public final class Cluster {
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
+ this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
this.internalTopics = Collections.unmodifiableSet(internalTopics);
this.controller = controller;
}
@@ -153,7 +171,8 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
- return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+ return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
+ Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
}
/**
@@ -163,7 +182,8 @@ public final class Cluster {
Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
combinedPartitions.putAll(partitions);
return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
- new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
+ new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.invalidTopics),
+ new HashSet<>(this.internalTopics), this.controller);
}
/**
@@ -172,7 +192,7 @@ public final class Cluster {
public List<Node> nodes() {
return this.nodes;
}
-
+
/**
* Get the node by the node id (or null if no such node exists)
* @param id The id of the node
@@ -256,6 +276,10 @@ public final class Cluster {
return unauthorizedTopics;
}
+ public Set<String> invalidTopics() {
+ return invalidTopics;
+ }
+
public Set<String> internalTopics() {
return internalTopics;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
index f79e9a7..729ebba 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.errors;
+import java.util.HashSet;
+import java.util.Set;
+
+
/**
* The client has attempted to perform an operation on an invalid topic.
* For example the topic name is too long, contains invalid characters etc.
@@ -24,23 +28,36 @@ package org.apache.kafka.common.errors;
* @see UnknownTopicOrPartitionException
*/
public class InvalidTopicException extends ApiException {
-
private static final long serialVersionUID = 1L;
+ private final Set<String> invalidTopics;
+
public InvalidTopicException() {
super();
+ invalidTopics = new HashSet<>();
}
public InvalidTopicException(String message, Throwable cause) {
super(message, cause);
+ invalidTopics = new HashSet<>();
}
public InvalidTopicException(String message) {
super(message);
+ invalidTopics = new HashSet<>();
}
public InvalidTopicException(Throwable cause) {
super(cause);
+ invalidTopics = new HashSet<>();
}
+ public InvalidTopicException(Set<String> invalidTopics) {
+ super("Invalid topics: " + invalidTopics);
+ this.invalidTopics = invalidTopics;
+ }
+
+ public Set<String> invalidTopics() {
+ return invalidTopics;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 28a412d..09a04e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -366,7 +366,7 @@ public class MetadataResponse extends AbstractResponse {
}
return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
- internalTopics, this.controller);
+ topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, this.controller);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8363079..7079471 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -93,7 +93,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -197,19 +196,6 @@ public class KafkaAdminClientTest {
}
}
- private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
- throws InterruptedException {
- try {
- future.get();
- fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
- } catch (ExecutionException ee) {
- Throwable cause = ee.getCause();
- assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
- cause.getClass().getSimpleName(),
- exceptionClass, cause.getClass());
- }
- }
-
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@@ -228,7 +214,7 @@ public class KafkaAdminClientTest {
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
- assertFutureError(future, TimeoutException.class);
+ TestUtils.assertFutureError(future, TimeoutException.class);
}
}
@@ -306,7 +292,7 @@ public class KafkaAdminClientTest {
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
- assertFutureError(future, SaslAuthenticationException.class);
+ TestUtils.assertFutureError(future, SaslAuthenticationException.class);
}
}
@@ -401,14 +387,14 @@ public class KafkaAdminClientTest {
List<String> sillyTopicNames = asList("", null);
Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
- assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+ TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
- assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+ TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
@@ -419,7 +405,7 @@ public class KafkaAdminClientTest {
Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
- assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+ TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
assertEquals(0, env.kafkaClient().inFlightRequestCount());
}
@@ -564,7 +550,7 @@ public class KafkaAdminClientTest {
// Test a call where we get back an error.
env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
- assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
+ TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
}
}
@@ -590,9 +576,9 @@ public class KafkaAdminClientTest {
));
results = env.adminClient().createAcls(asList(ACL1, ACL2));
assertCollectionIs(results.values().keySet(), ACL1, ACL2);
- assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
+ TestUtils.assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
results.values().get(ACL2).get();
- assertFutureError(results.all(), SecurityDisabledException.class);
+ TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
}
}
@@ -614,8 +600,8 @@ public class KafkaAdminClientTest {
assertEquals(ACL1, filter1Results.values().get(0).binding());
assertEquals(null, filter1Results.values().get(1).exception());
assertEquals(ACL2, filter1Results.values().get(1).binding());
- assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
- assertFutureError(results.all(), SecurityDisabledException.class);
+ TestUtils.assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
+ TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
// Test a call where one deletion result has an error.
env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -624,7 +610,7 @@ public class KafkaAdminClientTest {
new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
assertTrue(results.values().get(FILTER2).get().values().isEmpty());
- assertFutureError(results.all(), SecurityDisabledException.class);
+ TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
// Test a call where there are no errors.
env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -673,7 +659,7 @@ public class KafkaAdminClientTest {
return result.listings().isDone();
}
}, "Timed out waiting for listTopics to complete");
- assertFutureError(result.listings(), TimeoutException.class);
+ TestUtils.assertFutureError(result.listings(), TimeoutException.class);
log.info("Verified the error result of AdminClient#listTopics");
// The next request should succeed.
@@ -904,7 +890,7 @@ public class KafkaAdminClientTest {
node2);
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
- assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+ TestUtils.assertFutureError(result.all(), CoordinatorNotAvailableException.class);
Collection<ConsumerGroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
for (ConsumerGroupListing listing : listings) {
@@ -947,7 +933,7 @@ public class KafkaAdminClientTest {
Collections.emptyList()));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
- assertFutureError(result.all(), KafkaException.class);
+ TestUtils.assertFutureError(result.all(), KafkaException.class);
}
}
@@ -1091,7 +1077,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
- assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+ TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c7cfeb0..c83fe06 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.util.ArrayList;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
@@ -32,9 +33,11 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -1759,7 +1762,7 @@ public class KafkaConsumerTest {
int maxPollRecords = Integer.MAX_VALUE;
boolean checkCrcs = true;
int rebalanceTimeoutMs = 60000;
-
+
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
@@ -1854,4 +1857,37 @@ public class KafkaConsumerTest {
consumer.close(1, TimeUnit.SECONDS);
EasyMock.verify(consumer);
}
+
+ @Test(expected = InvalidTopicException.class)
+ public void testSubscriptionOnInvalidTopic() throws Exception {
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster();
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = createMetadata();
+ metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+ PartitionAssignor assignor = new RoundRobinAssignor();
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to space
+
+ Set<String> invalidTopic = new HashSet<String>();
+ invalidTopic.add(invalidTopicName);
+ Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+ cluster.nodes(),
+ new ArrayList<PartitionInfo>(0),
+ Collections.<String>emptySet(),
+ invalidTopic,
+ cluster.internalTopics(),
+ cluster.controller());
+ client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
+
+
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+ consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
+
+ consumer.poll(Duration.ZERO);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8bfc5e7..bf03e46 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.clients.producer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
@@ -27,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -609,4 +613,45 @@ public class KafkaProducerTest {
producer.close(0, TimeUnit.MILLISECONDS);
}
}
+
+ @Test
+ public void testSendToInvalidTopic() throws Exception {
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster();
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+ metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+
+ Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
+ ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+ new StringSerializer(), new StringSerializer(), metadata, client);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to space
+ ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ Set<String> invalidTopic = new HashSet<String>();
+ invalidTopic.add(invalidTopicName);
+ Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+ cluster.nodes(),
+ new ArrayList<PartitionInfo>(0),
+ Collections.<String>emptySet(),
+ invalidTopic,
+ cluster.internalTopics(),
+ cluster.controller());
+ client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet());
+
+ Future<RecordMetadata> future = producer.send(record);
+
+ assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics());
+ TestUtils.assertFutureError(future, InvalidTopicException.class);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ef9e541..3ab2bce 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -45,6 +45,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -341,4 +343,17 @@ public class TestUtils {
buffer.rewind();
return buffer;
}
+
+ public static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+ throws InterruptedException {
+ try {
+ future.get();
+ fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+ cause.getClass().getSimpleName(),
+ exceptionClass, cause.getClass());
+ }
+ }
}