You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/17 22:48:00 UTC

[jira] [Commented] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

    [ https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547183#comment-16547183 ] 

ASF GitHub Bot commented on KAFKA-5098:
---------------------------------------

jjkoshy closed pull request #5247: KAFKA-5098: KafkaProducer.send() dose not block if topic name has ill…
URL: https://github.com/apache/kafka/pull/5247
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b1da9de8ac1..91b15875cd0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -353,6 +353,7 @@ private synchronized void requestUpdateForNewTopics() {
 
     private Cluster getClusterForCurrentTopics(Cluster cluster) {
         Set<String> unauthorizedTopics = new HashSet<>();
+        Set<String> invalidTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
@@ -364,6 +365,9 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
             unauthorizedTopics.retainAll(this.topics.keySet());
 
+            invalidTopics.addAll(cluster.invalidTopics());
+            invalidTopics.addAll(this.cluster.invalidTopics());
+
             for (String topic : this.topics.keySet()) {
                 List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
                 if (!partitionInfoList.isEmpty()) {
@@ -373,6 +377,6 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
             nodes = cluster.nodes();
             controller  = cluster.controller();
         }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
+        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f722408fdbf..fb37feea9f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1148,6 +1148,8 @@ public void assign(Collection<TopicPartition> partitions) {
      * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
      *             partitions to consume from
      * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
+     * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
+     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
@@ -1595,7 +1597,7 @@ public long position(TopicPartition partition) {
 
     /**
      * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
-     * This method may issue a remote call to the server if there is no current position 
+     * This method may issue a remote call to the server if there is no current position
      * for the given partition.
      * <p>
      * This call will block until the position can be determined, an unrecoverable error is
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 060e404ef0c..ea6d47249eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -203,6 +204,10 @@ public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                 if (!cluster.unauthorizedTopics().isEmpty())
                     throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
 
+                // if we encounter any invalid topics, raise an exception to the user
+                if (!cluster.invalidTopics().isEmpty())
+                    throw new InvalidTopicException(cluster.invalidTopics());
+
                 if (subscriptions.hasPatternSubscription())
                     updatePatternSubscription(cluster);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5af5b60093..3a6717b7676 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -898,8 +899,13 @@ private void setReadOnly(Headers headers) {
      */
     private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
         // add topic to metadata topic list if it is not there already and reset expiry
-        metadata.add(topic);
         Cluster cluster = metadata.fetch();
+
+        if (cluster.invalidTopics().contains(topic))
+            throw new InvalidTopicException(topic);
+
+        metadata.add(topic);
+
         Integer partitionsCount = cluster.partitionCountForTopic(topic);
         // Return cached metadata if we have it, and if the record's partition is either undefined
         // or within the known partition range
@@ -930,6 +936,8 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
+            if (cluster.invalidTopics().contains(topic))
+                throw new InvalidTopicException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
         } while (partitionsCount == null);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ccbaa306d48..33d37494bf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -36,6 +36,7 @@
     private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
+    private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -55,7 +56,7 @@ public Cluster(String clusterId,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, null);
     }
 
     /**
@@ -69,7 +70,22 @@ public Cluster(String clusterId,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, controller);
+    }
+
+    /**
+     * Create a new cluster with the given id, nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(String clusterId,
+        Collection<Node> nodes,
+        Collection<PartitionInfo> partitions,
+        Set<String> unauthorizedTopics,
+        Set<String> invalidTopics,
+        Set<String> internalTopics,
+        Node controller) {
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 
     private Cluster(String clusterId,
@@ -77,6 +93,7 @@ private Cluster(String clusterId,
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
+                    Set<String> invalidTopics,
                     Set<String> internalTopics,
                     Node controller) {
         this.isBootstrapConfigured = isBootstrapConfigured;
@@ -131,6 +148,7 @@ private Cluster(String clusterId,
             this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
 
         this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
+        this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
         this.internalTopics = Collections.unmodifiableSet(internalTopics);
         this.controller = controller;
     }
@@ -153,7 +171,8 @@ public static Cluster bootstrap(List<InetSocketAddress> addresses) {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
+                            Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
     }
 
     /**
@@ -163,7 +182,8 @@ public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
         return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
-                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
+                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.invalidTopics),
+                new HashSet<>(this.internalTopics), this.controller);
     }
 
     /**
@@ -172,7 +192,7 @@ public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
     public List<Node> nodes() {
         return this.nodes;
     }
-    
+
     /**
      * Get the node by the node id (or null if no such node exists)
      * @param id The id of the node
@@ -256,6 +276,10 @@ public Integer partitionCountForTopic(String topic) {
         return unauthorizedTopics;
     }
 
+    public Set<String> invalidTopics() {
+        return invalidTopics;
+    }
+
     public Set<String> internalTopics() {
         return internalTopics;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
index f79e9a7b067..729ebba8a6f 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.common.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
+
 /**
  * The client has attempted to perform an operation on an invalid topic.
  * For example the topic name is too long, contains invalid characters etc.
@@ -24,23 +28,36 @@
  * @see UnknownTopicOrPartitionException
  */
 public class InvalidTopicException extends ApiException {
-
     private static final long serialVersionUID = 1L;
 
+    private final Set<String> invalidTopics;
+
     public InvalidTopicException() {
         super();
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(String message, Throwable cause) {
         super(message, cause);
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(String message) {
         super(message);
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(Throwable cause) {
         super(cause);
+        invalidTopics = new HashSet<>();
     }
 
+    public InvalidTopicException(Set<String> invalidTopics) {
+        super("Invalid topics: " + invalidTopics);
+        this.invalidTopics = invalidTopics;
+    }
+
+    public Set<String> invalidTopics() {
+        return invalidTopics;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 28a412df800..09a04e55603 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -366,7 +366,7 @@ public Cluster cluster() {
         }
 
         return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
-                internalTopics, this.controller);
+                topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, this.controller);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 836307902f4..70794714026 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -93,7 +93,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -197,19 +196,6 @@ public void testCloseAdminClient() {
         }
     }
 
-    private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
-            throws InterruptedException {
-        try {
-            future.get();
-            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
-        } catch (ExecutionException ee) {
-            Throwable cause = ee.getCause();
-            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
-                            cause.getClass().getSimpleName(),
-                    exceptionClass, cause.getClass());
-        }
-    }
-
     /**
      * Test that the client properly times out when we don't receive any metadata.
      */
@@ -228,7 +214,7 @@ public void testTimeoutWithoutMetadata() throws Exception {
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
-            assertFutureError(future, TimeoutException.class);
+            TestUtils.assertFutureError(future, TimeoutException.class);
         }
     }
 
@@ -306,7 +292,7 @@ public void testPropagatedMetadataFetchException() throws Exception {
             KafkaFuture<Void> future = env.adminClient().createTopics(
                 Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                 new CreateTopicsOptions().timeoutMs(1000)).all();
-            assertFutureError(future, SaslAuthenticationException.class);
+            TestUtils.assertFutureError(future, SaslAuthenticationException.class);
         }
     }
 
@@ -401,14 +387,14 @@ public void testInvalidTopicNames() throws Exception {
             List<String> sillyTopicNames = asList("", null);
             Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
             Map<String, KafkaFuture<TopicDescription>> describeFutures =
                     env.adminClient().describeTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
@@ -419,7 +405,7 @@ public void testInvalidTopicNames() throws Exception {
 
             Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
         }
@@ -564,7 +550,7 @@ public void testDescribeAcls() throws Exception {
             // Test a call where we get back an error.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
                 new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
-            assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
         }
     }
 
@@ -590,9 +576,9 @@ public void testCreateAcls() throws Exception {
             ));
             results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
-            assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
             results.values().get(ACL2).get();
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
         }
     }
 
@@ -614,8 +600,8 @@ public void testDeleteAcls() throws Exception {
             assertEquals(ACL1, filter1Results.values().get(0).binding());
             assertEquals(null, filter1Results.values().get(1).exception());
             assertEquals(ACL2, filter1Results.values().get(1).binding());
-            assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -624,7 +610,7 @@ public void testDeleteAcls() throws Exception {
                     new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
             results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             assertTrue(results.values().get(FILTER2).get().values().isEmpty());
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -673,7 +659,7 @@ public boolean conditionMet() {
                     return result.listings().isDone();
                 }
             }, "Timed out waiting for listTopics to complete");
-            assertFutureError(result.listings(), TimeoutException.class);
+            TestUtils.assertFutureError(result.listings(), TimeoutException.class);
             log.info("Verified the error result of AdminClient#listTopics");
 
             // The next request should succeed.
@@ -904,7 +890,7 @@ public void testListConsumerGroups() throws Exception {
                     node2);
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+            TestUtils.assertFutureError(result.all(), CoordinatorNotAvailableException.class);
             Collection<ConsumerGroupListing> listings = result.valid().get();
             assertEquals(2, listings.size());
             for (ConsumerGroupListing listing : listings) {
@@ -947,7 +933,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception {
                             Collections.emptyList()));
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            assertFutureError(result.all(), KafkaException.class);
+            TestUtils.assertFutureError(result.all(), KafkaException.class);
         }
     }
 
@@ -1091,7 +1077,7 @@ public void testDeleteConsumerGroups() throws Exception {
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
-            assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+            TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
 
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c7cfeb05bbd..c83fe06ebd0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -32,9 +33,11 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -1759,7 +1762,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset,
         int maxPollRecords = Integer.MAX_VALUE;
         boolean checkCrcs = true;
         int rebalanceTimeoutMs = 60000;
-        
+
         Deserializer<String> keyDeserializer = new StringDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
@@ -1854,4 +1857,37 @@ public void testCloseWithTimeUnit() {
         consumer.close(1, TimeUnit.SECONDS);
         EasyMock.verify(consumer);
     }
+
+    @Test(expected = InvalidTopicException.class)
+    public void testSubscriptionOnInvalidTopic() throws Exception {
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        String invalidTopicName = "topic abc";  // Invalid topic name due to space
+
+        Set<String> invalidTopic = new HashSet<String>();
+        invalidTopic.add(invalidTopicName);
+        Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+                                                            cluster.nodes(),
+                                                            new ArrayList<PartitionInfo>(0),
+                                                            Collections.<String>emptySet(),
+                                                            invalidTopic,
+                                                            cluster.internalTopics(),
+                                                            cluster.controller());
+        client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
+
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
+
+        consumer.poll(Duration.ZERO);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8bfc5e7d28a..bf03e46ec08 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -27,6 +30,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -609,4 +613,45 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
             producer.close(0, TimeUnit.MILLISECONDS);
         }
     }
+
+    @Test
+    public void testSendToInvalidTopic() throws Exception {
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
+            ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+            new StringSerializer(), new StringSerializer(), metadata, client);
+
+        String invalidTopicName = "topic abc";          // Invalid topic name due to space
+        ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+        Set<String> invalidTopic = new HashSet<String>();
+        invalidTopic.add(invalidTopicName);
+        Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+                                                            cluster.nodes(),
+                                                            new ArrayList<PartitionInfo>(0),
+                                                            Collections.<String>emptySet(),
+                                                            invalidTopic,
+                                                            cluster.internalTopics(),
+                                                            cluster.controller());
+        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet());
+
+        Future<RecordMetadata> future = producer.send(record);
+
+        assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics());
+        TestUtils.assertFutureError(future, InvalidTopicException.class);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ef9e5416abe..3ab2bce8f73 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -45,6 +45,8 @@
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -341,4 +343,17 @@ public static ByteBuffer toBuffer(Struct struct) {
         buffer.rewind();
         return buffer;
     }
+
+    public static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+        throws InterruptedException {
+        try {
+            future.get();
+            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+                    cause.getClass().getSimpleName(),
+                exceptionClass, cause.getClass());
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5098
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5098
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.2.0
>         Environment: Java client running against server using wurstmeister/kafka Docker image.
>            Reporter: Jeff Larsen
>            Assignee: Ahmed Al-Mehdi
>            Priority: Major
>             Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a topic with a forward slash in the name, the call blocks and we get a TimeoutException in the Callback. I would expect it to return immediately with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "kafka.example.com:9092");
>     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("max.block.ms", 10000); // 10 seconds should illustrate our point
>     String separator = "/";
>     //String separator = "_";
>     try (Producer<String, String> producer = new KafkaProducer<>(props)) {
>       System.out.println("Calling KafkaProducer.send() at " + new Date());
>       producer.send(
>           new ProducerRecord<String, String>("abc" + separator + "someStreamName",
>               "Not expecting a TimeoutException here"),
>           new Callback() {
>             @Override
>             public void onCompletion(RecordMetadata metadata, Exception e) {
>               if (e != null) {
>                 System.out.println(e.toString());
>               }
>             }
>           });
>       System.out.println("KafkaProducer.send() completed at " + new Date());
>     }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)