You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2018/07/17 22:45:52 UTC

[kafka] branch trunk updated: KAFKA-5098; KafkaProducer should reject sends to invalid topics

This is an automated email from the ASF dual-hosted git repository.

jjkoshy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c47a3e  KAFKA-5098; KafkaProducer should reject sends to invalid topics
8c47a3e is described below

commit 8c47a3e52f3d6871a708b1d2c8b5a7e30a2a5b99
Author: Ahmed Al Mehdi <aa...@aalmehdi-ld1.linkedin.biz>
AuthorDate: Tue Jul 17 15:45:29 2018 -0700

    KAFKA-5098; KafkaProducer should reject sends to invalid topics
    
    …egal char and generates InvalidTopicException
    
    If config parameter max.block.ms config parameter is set to a non-zero value,
    KafkaProducer.send() blocks for the max.block.ms time if topic name has illegal
    char or is invalid.
    
    Wrote a unit test that verifies the appropriate exception is returned when
    performing a get on the returned future by KafkaProducer.send().
    
    Author: Ahmed Al Mehdi <aa...@aalmehdi-ld1.linkedin.biz>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Joel Koshy <jj...@gmail.com>, Manikumar Reddy O <ma...@gmail.com>
    
    Closes #5247 from ahmedha/KAFKA-5098
---
 .../java/org/apache/kafka/clients/Metadata.java    |  6 ++-
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../consumer/internals/ConsumerCoordinator.java    |  5 +++
 .../kafka/clients/producer/KafkaProducer.java      | 10 ++++-
 .../main/java/org/apache/kafka/common/Cluster.java | 34 +++++++++++++---
 .../kafka/common/errors/InvalidTopicException.java | 19 ++++++++-
 .../kafka/common/requests/MetadataResponse.java    |  2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 44 ++++++++-------------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 38 +++++++++++++++++-
 .../kafka/clients/producer/KafkaProducerTest.java  | 45 ++++++++++++++++++++++
 .../test/java/org/apache/kafka/test/TestUtils.java | 15 ++++++++
 11 files changed, 182 insertions(+), 40 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b1da9de..91b1587 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -353,6 +353,7 @@ public final class Metadata {
 
     private Cluster getClusterForCurrentTopics(Cluster cluster) {
         Set<String> unauthorizedTopics = new HashSet<>();
+        Set<String> invalidTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
@@ -364,6 +365,9 @@ public final class Metadata {
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
             unauthorizedTopics.retainAll(this.topics.keySet());
 
+            invalidTopics.addAll(cluster.invalidTopics());
+            invalidTopics.addAll(this.cluster.invalidTopics());
+
             for (String topic : this.topics.keySet()) {
                 List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
                 if (!partitionInfoList.isEmpty()) {
@@ -373,6 +377,6 @@ public final class Metadata {
             nodes = cluster.nodes();
             controller  = cluster.controller();
         }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
+        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f722408..fb37fee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1148,6 +1148,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
      *             partitions to consume from
      * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
+     * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
+     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
      */
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
@@ -1595,7 +1597,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
-     * This method may issue a remote call to the server if there is no current position 
+     * This method may issue a remote call to the server if there is no current position
      * for the given partition.
      * <p>
      * This call will block until the position can be determined, an unrecoverable error is
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 060e404..ea6d472 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -203,6 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 if (!cluster.unauthorizedTopics().isEmpty())
                     throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
 
+                // if we encounter any invalid topics, raise an exception to the user
+                if (!cluster.invalidTopics().isEmpty())
+                    throw new InvalidTopicException(cluster.invalidTopics());
+
                 if (subscriptions.hasPatternSubscription())
                     updatePatternSubscription(cluster);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5af5b6..3a6717b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -898,8 +899,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
         // add topic to metadata topic list if it is not there already and reset expiry
-        metadata.add(topic);
         Cluster cluster = metadata.fetch();
+
+        if (cluster.invalidTopics().contains(topic))
+            throw new InvalidTopicException(topic);
+
+        metadata.add(topic);
+
         Integer partitionsCount = cluster.partitionCountForTopic(topic);
         // Return cached metadata if we have it, and if the record's partition is either undefined
         // or within the known partition range
@@ -930,6 +936,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
+            if (cluster.invalidTopics().contains(topic))
+                throw new InvalidTopicException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
         } while (partitionsCount == null);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ccbaa30..33d3749 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -36,6 +36,7 @@ public final class Cluster {
     private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
+    private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -55,7 +56,7 @@ public final class Cluster {
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, null);
     }
 
     /**
@@ -69,7 +70,22 @@ public final class Cluster {
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, controller);
+    }
+
+    /**
+     * Create a new cluster with the given id, nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(String clusterId,
+        Collection<Node> nodes,
+        Collection<PartitionInfo> partitions,
+        Set<String> unauthorizedTopics,
+        Set<String> invalidTopics,
+        Set<String> internalTopics,
+        Node controller) {
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 
     private Cluster(String clusterId,
@@ -77,6 +93,7 @@ public final class Cluster {
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
+                    Set<String> invalidTopics,
                     Set<String> internalTopics,
                     Node controller) {
         this.isBootstrapConfigured = isBootstrapConfigured;
@@ -131,6 +148,7 @@ public final class Cluster {
             this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
 
         this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
+        this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
         this.internalTopics = Collections.unmodifiableSet(internalTopics);
         this.controller = controller;
     }
@@ -153,7 +171,8 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
+                            Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
     }
 
     /**
@@ -163,7 +182,8 @@ public final class Cluster {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
         return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
-                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
+                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.invalidTopics),
+                new HashSet<>(this.internalTopics), this.controller);
     }
 
     /**
@@ -172,7 +192,7 @@ public final class Cluster {
     public List<Node> nodes() {
         return this.nodes;
     }
-    
+
     /**
      * Get the node by the node id (or null if no such node exists)
      * @param id The id of the node
@@ -256,6 +276,10 @@ public final class Cluster {
         return unauthorizedTopics;
     }
 
+    public Set<String> invalidTopics() {
+        return invalidTopics;
+    }
+
     public Set<String> internalTopics() {
         return internalTopics;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
index f79e9a7..729ebba 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.common.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
+
 /**
  * The client has attempted to perform an operation on an invalid topic.
  * For example the topic name is too long, contains invalid characters etc.
@@ -24,23 +28,36 @@ package org.apache.kafka.common.errors;
  * @see UnknownTopicOrPartitionException
  */
 public class InvalidTopicException extends ApiException {
-
     private static final long serialVersionUID = 1L;
 
+    private final Set<String> invalidTopics;
+
     public InvalidTopicException() {
         super();
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(String message, Throwable cause) {
         super(message, cause);
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(String message) {
         super(message);
+        invalidTopics = new HashSet<>();
     }
 
     public InvalidTopicException(Throwable cause) {
         super(cause);
+        invalidTopics = new HashSet<>();
     }
 
+    public InvalidTopicException(Set<String> invalidTopics) {
+        super("Invalid topics: " + invalidTopics);
+        this.invalidTopics = invalidTopics;
+    }
+
+    public Set<String> invalidTopics() {
+        return invalidTopics;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 28a412d..09a04e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -366,7 +366,7 @@ public class MetadataResponse extends AbstractResponse {
         }
 
         return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
-                internalTopics, this.controller);
+                topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, this.controller);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8363079..7079471 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -93,7 +93,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -197,19 +196,6 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
-            throws InterruptedException {
-        try {
-            future.get();
-            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
-        } catch (ExecutionException ee) {
-            Throwable cause = ee.getCause();
-            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
-                            cause.getClass().getSimpleName(),
-                    exceptionClass, cause.getClass());
-        }
-    }
-
     /**
      * Test that the client properly times out when we don't receive any metadata.
      */
@@ -228,7 +214,7 @@ public class KafkaAdminClientTest {
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
-            assertFutureError(future, TimeoutException.class);
+            TestUtils.assertFutureError(future, TimeoutException.class);
         }
     }
 
@@ -306,7 +292,7 @@ public class KafkaAdminClientTest {
             KafkaFuture<Void> future = env.adminClient().createTopics(
                 Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                 new CreateTopicsOptions().timeoutMs(1000)).all();
-            assertFutureError(future, SaslAuthenticationException.class);
+            TestUtils.assertFutureError(future, SaslAuthenticationException.class);
         }
     }
 
@@ -401,14 +387,14 @@ public class KafkaAdminClientTest {
             List<String> sillyTopicNames = asList("", null);
             Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
             Map<String, KafkaFuture<TopicDescription>> describeFutures =
                     env.adminClient().describeTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
@@ -419,7 +405,7 @@ public class KafkaAdminClientTest {
 
             Map<String, KafkaFuture<Void>> createFutures = env.adminClient().createTopics(newTopics).values();
             for (String sillyTopicName : sillyTopicNames) {
-                assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+                TestUtils.assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
             }
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
         }
@@ -564,7 +550,7 @@ public class KafkaAdminClientTest {
             // Test a call where we get back an error.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
                 new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
-            assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
         }
     }
 
@@ -590,9 +576,9 @@ public class KafkaAdminClientTest {
             ));
             results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
-            assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
             results.values().get(ACL2).get();
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
         }
     }
 
@@ -614,8 +600,8 @@ public class KafkaAdminClientTest {
             assertEquals(ACL1, filter1Results.values().get(0).binding());
             assertEquals(null, filter1Results.values().get(1).exception());
             assertEquals(ACL2, filter1Results.values().get(1).binding());
-            assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -624,7 +610,7 @@ public class KafkaAdminClientTest {
                     new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
             results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             assertTrue(results.values().get(FILTER2).get().values().isEmpty());
-            assertFutureError(results.all(), SecurityDisabledException.class);
+            TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -673,7 +659,7 @@ public class KafkaAdminClientTest {
                     return result.listings().isDone();
                 }
             }, "Timed out waiting for listTopics to complete");
-            assertFutureError(result.listings(), TimeoutException.class);
+            TestUtils.assertFutureError(result.listings(), TimeoutException.class);
             log.info("Verified the error result of AdminClient#listTopics");
 
             // The next request should succeed.
@@ -904,7 +890,7 @@ public class KafkaAdminClientTest {
                     node2);
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+            TestUtils.assertFutureError(result.all(), CoordinatorNotAvailableException.class);
             Collection<ConsumerGroupListing> listings = result.valid().get();
             assertEquals(2, listings.size());
             for (ConsumerGroupListing listing : listings) {
@@ -947,7 +933,7 @@ public class KafkaAdminClientTest {
                             Collections.emptyList()));
 
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            assertFutureError(result.all(), KafkaException.class);
+            TestUtils.assertFutureError(result.all(), KafkaException.class);
         }
     }
 
@@ -1091,7 +1077,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
-            assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
+            TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
 
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c7cfeb0..c83fe06 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -32,9 +33,11 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -1759,7 +1762,7 @@ public class KafkaConsumerTest {
         int maxPollRecords = Integer.MAX_VALUE;
         boolean checkCrcs = true;
         int rebalanceTimeoutMs = 60000;
-        
+
         Deserializer<String> keyDeserializer = new StringDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
@@ -1854,4 +1857,37 @@ public class KafkaConsumerTest {
         consumer.close(1, TimeUnit.SECONDS);
         EasyMock.verify(consumer);
     }
+
+    @Test(expected = InvalidTopicException.class)
+    public void testSubscriptionOnInvalidTopic() throws Exception {
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        String invalidTopicName = "topic abc";  // Invalid topic name due to space
+
+        Set<String> invalidTopic = new HashSet<String>();
+        invalidTopic.add(invalidTopicName);
+        Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+                                                            cluster.nodes(),
+                                                            new ArrayList<PartitionInfo>(0),
+                                                            Collections.<String>emptySet(),
+                                                            invalidTopic,
+                                                            cluster.internalTopics(),
+                                                            cluster.controller());
+        client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
+
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
+        consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
+
+        consumer.poll(Duration.ZERO);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8bfc5e7..bf03e46 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -27,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -609,4 +613,45 @@ public class KafkaProducerTest {
             producer.close(0, TimeUnit.MILLISECONDS);
         }
     }
+
+    @Test
+    public void testSendToInvalidTopic() throws Exception {
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
+            ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+            new StringSerializer(), new StringSerializer(), metadata, client);
+
+        String invalidTopicName = "topic abc";          // Invalid topic name due to space
+        ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+        Set<String> invalidTopic = new HashSet<String>();
+        invalidTopic.add(invalidTopicName);
+        Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
+                                                            cluster.nodes(),
+                                                            new ArrayList<PartitionInfo>(0),
+                                                            Collections.<String>emptySet(),
+                                                            invalidTopic,
+                                                            cluster.internalTopics(),
+                                                            cluster.controller());
+        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.<String>emptySet());
+
+        Future<RecordMetadata> future = producer.send(record);
+
+        assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics());
+        TestUtils.assertFutureError(future, InvalidTopicException.class);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ef9e541..3ab2bce 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -45,6 +45,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -341,4 +343,17 @@ public class TestUtils {
         buffer.rewind();
         return buffer;
     }
+
+    public static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+        throws InterruptedException {
+        try {
+            future.get();
+            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+                    cause.getClass().getSimpleName(),
+                exceptionClass, cause.getClass());
+        }
+    }
 }