You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/07 00:15:42 UTC

[incubator-pulsar] branch master updated: Validate topic name on broker side (#1178)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28ce0ad  Validate topic name on broker side (#1178)
28ce0ad is described below

commit 28ce0adaef76c52341231104daac4df3e2c55f53
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Feb 6 16:15:40 2018 -0800

    Validate topic name on broker side (#1178)
    
    * Validate topic name on broker side
    
    * Refactored to put them in a single validation method
    
    * Addressed comment
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 143 ++++++++++++++-------
 .../pulsar/broker/service/ServerCnxTest.java       |  93 +++++++++++---
 .../broker/service/utils/ClientChannelHelper.java  |   6 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |   3 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 5 files changed, 180 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3a6d337..0f0cd6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -80,6 +80,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
+import com.google.protobuf.GeneratedMessageLite;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
@@ -106,7 +107,7 @@ public class ServerCnx extends PulsarHandler {
     private final int MaxNonPersistentPendingMessages;
     private String originalPrincipal = null;
     private Set<String> proxyRoles = Sets.newHashSet();
-    
+
     enum State {
         Start, Connected, Failed
     }
@@ -198,7 +199,7 @@ public class ServerCnx extends PulsarHandler {
 
         return true;
     }
-    
+
     // ////
     // // Incoming commands handling
     // ////
@@ -206,9 +207,14 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
-        final String topicName = lookup.getTopic();
+
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId);
+            log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
+        }
+
+        DestinationName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
+        if (topicName == null) {
+            return;
         }
 
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
@@ -218,21 +224,21 @@ public class ServerCnx extends PulsarHandler {
             if (!validateOriginalPrincipal(originalPrincipal,
                     newLookupErrorResponse(ServerError.AuthorizationError,
                             "Valid Proxy Client role should be provided for lookup ", requestId),
-                    topicName, "Valid Proxy Client role should be provided for lookup ")) {
+                    topicName.toString(), "Valid Proxy Client role should be provided for lookup ")) {
                 lookupSemaphore.release();
                 return;
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
-                        .canLookupAsync(DestinationName.get(topicName), authRole);
+                        .canLookupAsync(topicName, authRole);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
-            
+
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                 if (isProxyAuthorized) {
-                    lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName),
+                    lookupDestinationAsync(getBrokerService().pulsar(), topicName,
                             lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
                             lookup.getRequestId()).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
@@ -273,10 +279,16 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
         final long requestId = partitionMetadata.getRequestId();
-        final String topicName = partitionMetadata.getTopic();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId);
+            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
+                    remoteAddress, requestId);
         }
+
+        DestinationName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata);
+        if (topicName == null) {
+            return;
+        }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
@@ -284,22 +296,23 @@ public class ServerCnx extends PulsarHandler {
             if (!validateOriginalPrincipal(originalPrincipal,
                     Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
                             "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId),
-                    topicName, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) {
+                    topicName.toString(),
+                    "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) {
                 lookupSemaphore.release();
                 return;
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
                 isProxyAuthorizedFuture = service.getAuthorizationManager()
-                        .canLookupAsync(DestinationName.get(topicName), authRole);
+                        .canLookupAsync(topicName, authRole);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                     if (isProxyAuthorized) {
-                        getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                                originalPrincipal != null ? originalPrincipal : authRole,
-                                DestinationName.get(topicName)).handle((metadata, ex) -> {
+                    getPartitionedTopicMetadata(getBrokerService().pulsar(),
+                            originalPrincipal != null ? originalPrincipal : authRole, topicName)
+                                    .handle((metadata, ex) -> {
                                     if (ex == null) {
                                         int partitions = metadata.partitions;
                                         ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -446,19 +459,38 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleSubscribe(final CommandSubscribe subscribe) {
         checkArgument(state == State.Connected);
-        final String topicName = subscribe.getTopic();
         final long requestId = subscribe.getRequestId();
         final long consumerId = subscribe.getConsumerId();
+
+        DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
+        if (topicName == null) {
+            return;
+        }
+
         if (!validateOriginalPrincipal(originalPrincipal,
                 Commands.newError(requestId, ServerError.AuthorizationError,
                         "Valid Proxy Client role should be provided while subscribing "),
-                topicName, "Valid Proxy Client role should be provided while subscribing ")) {
+                topicName.toString(), "Valid Proxy Client role should be provided while subscribing ")) {
             return;
         }
+
+        final String subscriptionName = subscribe.getSubscription();
+        final SubType subType = subscribe.getSubType();
+        final String consumerName = subscribe.getConsumerName();
+        final boolean isDurable = subscribe.getDurable();
+        final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
+                subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
+                subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
+                : null;
+
+        final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
+        final boolean readCompacted = subscribe.getReadCompacted();
+        final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
-                    authRole, subscribe.getSubscription());
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(topicName, authRole,
+                    subscribe.getSubscription());
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -466,26 +498,12 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
-                            DestinationName.get(subscribe.getTopic()),
-                            originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription());
+                    authorizationFuture = service.getAuthorizationManager().canConsumeAsync(topicName,
+                            originalPrincipal != null ? originalPrincipal : authRole, subscriptionName);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
 
-                final String subscriptionName = subscribe.getSubscription();
-                final SubType subType = subscribe.getSubType();
-                final String consumerName = subscribe.getConsumerName();
-                final boolean isDurable = subscribe.getDurable();
-                final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
-                        subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
-                        subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
-                        : null;
-
-                final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
-                final boolean readCompacted = subscribe.getReadCompacted();
-                final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
-
                 authorizationFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
                         if (log.isDebugEnabled()) {
@@ -527,7 +545,7 @@ public class ServerCnx extends PulsarHandler {
                             }
                         }
 
-                        service.getTopic(topicName)
+                        service.getTopic(topicName.toString())
                                 .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                                       subType, priorityLevel, consumerName, isDurable,
                                                                       startMessageId, metadata, readCompacted))
@@ -605,21 +623,29 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleProducer(final CommandProducer cmdProducer) {
         checkArgument(state == State.Connected);
-        final String topicName = cmdProducer.getTopic();
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
+        // Use producer name provided by client if present
+        final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
+                : service.generateUniqueProducerName();
+        final boolean isEncrypted = cmdProducer.getEncrypted();
+        final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
+        DestinationName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
+        if (topicName == null) {
+            return;
+        }
 
         if (!validateOriginalPrincipal(originalPrincipal,
                 Commands.newError(requestId, ServerError.AuthorizationError,
                         "Valid Proxy Client role should be provided while creating producer "),
-                topicName, "Valid Proxy Client role should be provided while creating producer ")) {
+                topicName.toString(), "Valid Proxy Client role should be provided while creating producer ")) {
             return;
         }
-        
+
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
-                    authRole);
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(topicName, authRole);
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -627,17 +653,11 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationManager().canProduceAsync(
-                            DestinationName.get(cmdProducer.getTopic().toString()),
+                    authorizationFuture = service.getAuthorizationManager().canProduceAsync(topicName,
                             originalPrincipal != null ? originalPrincipal : authRole);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
-                // Use producer name provided by client if present
-                final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
-                        : service.generateUniqueProducerName();
-                final boolean isEncrypted = cmdProducer.getEncrypted();
-                final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
 
                 authorizationFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
@@ -675,7 +695,7 @@ public class ServerCnx extends PulsarHandler {
 
                         log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
 
-                        service.getTopic(topicName).thenAccept((Topic topic) -> {
+                        service.getTopic(topicName.toString()).thenAccept((Topic topic) -> {
                             // Before creating producer, check if backlog quota exceeded
                             // on topic
                             if (topic.isBacklogQuotaExceeded(producerName)) {
@@ -704,7 +724,7 @@ public class ServerCnx extends PulsarHandler {
                                 return;
                             }
 
-                            disableTcpNoDelayIfNeeded(topicName, producerName);
+                            disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
 
                             Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
                                     isEncrypted, metadata);
@@ -1130,6 +1150,29 @@ public class ServerCnx extends PulsarHandler {
         }
     }
 
+    private DestinationName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) {
+        try {
+            return DestinationName.get(topic);
+        } catch (Throwable t) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t);
+            }
+
+            if (requestCommand instanceof CommandLookupTopic) {
+                ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
+                        "Invalid topic name: " + t.getMessage(), requestId));
+            } else if (requestCommand instanceof CommandPartitionedTopicMetadata) {
+                ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
+                        "Invalid topic name: " + t.getMessage(), requestId));
+            } else {
+                ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName,
+                        "Invalid topic name: " + t.getMessage()));
+            }
+
+            return null;
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b604cfd..4b895d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,8 +45,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.naming.AuthenticationException;
 
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -67,29 +65,28 @@ import org.apache.pulsar.broker.authorization.AuthorizationManager;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
+import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -105,6 +102,9 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -114,19 +114,19 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  */
 @Test
 public class ServerCnxTest {
-    private EmbeddedChannel channel;
+    protected EmbeddedChannel channel;
     private ServiceConfiguration svcConfig;
     private ServerCnx serverCnx;
-    private BrokerService brokerService;
+    protected BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ClientChannelHelper clientChannelHelper;
     private PulsarService pulsar;
     private ConfigurationCacheService configCacheService;
-    private NamespaceService namespaceService;
+    protected NamespaceService namespaceService;
     private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1]
             .getNumber();
 
-    private final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
+    protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
     private final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
     private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic";
     private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
@@ -1334,7 +1334,7 @@ public class ServerCnxTest {
         channel.finish();
     }
 
-    private void resetChannel() throws Exception {
+    protected void resetChannel() throws Exception {
         int MaxMessageSize = 5 * 1024 * 1024;
         if (channel != null && channel.isActive()) {
             serverCnx.close();
@@ -1345,7 +1345,7 @@ public class ServerCnxTest {
         channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
     }
 
-    private void setChannelConnected() throws Exception {
+    protected void setChannelConnected() throws Exception {
         Field channelState = ServerCnx.class.getDeclaredField("state");
         channelState.setAccessible(true);
         channelState.set(serverCnx, State.Connected);
@@ -1358,7 +1358,7 @@ public class ServerCnxTest {
         versionField.set(cnx, version);
     }
 
-    private Object getResponse() throws Exception {
+    protected Object getResponse() throws Exception {
         // Wait at most for 10s to get a response
         final long sleepTimeMs = 10;
         final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
@@ -1460,5 +1460,66 @@ public class ServerCnxTest {
         doReturn(successSubName).when(cursorMock).getName();
     }
 
+    @Test(timeOut = 30000)
+    public void testInvalidTopicOnLookup() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        String invalidTopicName = "xx/ass/aa/aaa";
+
+        resetChannel();
+        setChannelConnected();
+
+
+        channel.writeInbound(Commands.newLookup(invalidTopicName, true, 1));
+        Object obj = getResponse();
+        assertEquals(obj.getClass(), CommandLookupTopicResponse.class);
+        CommandLookupTopicResponse res = (CommandLookupTopicResponse) obj;
+        assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testInvalidTopicOnProducer() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        String invalidTopicName = "xx/ass/aa/aaa";
+
+        resetChannel();
+        setChannelConnected();
+
+        ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* producer id */, 1 /* request id */,
+                "prod-name", Collections.emptyMap());
+        channel.writeInbound(clientCommand);
+        Object obj = getResponse();
+        assertEquals(obj.getClass(), CommandError.class);
+        CommandError res = (CommandError) obj;
+        assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void testInvalidTopicOnSubscribe() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        String invalidTopicName = "xx/ass/aa/aaa";
+
+        resetChannel();
+        setChannelConnected();
+
+        channel.writeInbound(Commands.newSubscribe(invalidTopicName, "test-subscription", 1, 1, SubType.Exclusive, 0,
+                "consumerName"));
+        Object obj = getResponse();
+        assertEquals(obj.getClass(), CommandError.class);
+        CommandError res = (CommandError) obj;
+        assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+        channel.finish();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ServerCnxTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 717cd4a..3699f78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
@@ -146,6 +147,11 @@ public class ClientChannelHelper {
         protected void handleProducerSuccess(CommandProducerSuccess success) {
             queue.offer(CommandProducerSuccess.newBuilder(success).build());
         }
+
+        @Override
+        protected void handleLookupResponse(CommandLookupTopicResponse connection) {
+            queue.offer(CommandLookupTopicResponse.newBuilder(connection).build());
+        }
     };
 
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 98c851b..16a3cf0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -71,6 +71,7 @@ public final class PulsarApi {
     TooManyRequests(14, 14),
     TopicTerminatedError(15, 15),
     ProducerBusy(16, 16),
+    InvalidTopicName(17, 17),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -90,6 +91,7 @@ public final class PulsarApi {
     public static final int TooManyRequests_VALUE = 14;
     public static final int TopicTerminatedError_VALUE = 15;
     public static final int ProducerBusy_VALUE = 16;
+    public static final int InvalidTopicName_VALUE = 17;
     
     
     public final int getNumber() { return value; }
@@ -113,6 +115,7 @@ public final class PulsarApi {
         case 14: return TooManyRequests;
         case 15: return TopicTerminatedError;
         case 16: return ProducerBusy;
+        case 17: return InvalidTopicName;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 2fac3de..3b87273 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -111,6 +111,7 @@ enum ServerError {
     TopicTerminatedError = 15; // The topic has been terminated
 
     ProducerBusy         = 16; // Producer with same name is already connected
+    InvalidTopicName = 17; // The topic name is not valid
 }
 
 enum AuthMethod {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.