You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/07 00:15:42 UTC
[incubator-pulsar] branch master updated: Validate topic name on
broker side (#1178)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28ce0ad Validate topic name on broker side (#1178)
28ce0ad is described below
commit 28ce0adaef76c52341231104daac4df3e2c55f53
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Feb 6 16:15:40 2018 -0800
Validate topic name on broker side (#1178)
* Validate topic name on broker side
* Refactored to put them in a single validation method
* Addressed comment
---
.../apache/pulsar/broker/service/ServerCnx.java | 143 ++++++++++++++-------
.../pulsar/broker/service/ServerCnxTest.java | 93 +++++++++++---
.../broker/service/utils/ClientChannelHelper.java | 6 +
.../apache/pulsar/common/api/proto/PulsarApi.java | 3 +
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
5 files changed, 180 insertions(+), 66 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3a6d337..0f0cd6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -80,6 +80,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
+import com.google.protobuf.GeneratedMessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
@@ -106,7 +107,7 @@ public class ServerCnx extends PulsarHandler {
private final int MaxNonPersistentPendingMessages;
private String originalPrincipal = null;
private Set<String> proxyRoles = Sets.newHashSet();
-
+
enum State {
Start, Connected, Failed
}
@@ -198,7 +199,7 @@ public class ServerCnx extends PulsarHandler {
return true;
}
-
+
// ////
// // Incoming commands handling
// ////
@@ -206,9 +207,14 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();
- final String topicName = lookup.getTopic();
+
if (log.isDebugEnabled()) {
- log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId);
+ log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
+ }
+
+ DestinationName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
+ if (topicName == null) {
+ return;
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
@@ -218,21 +224,21 @@ public class ServerCnx extends PulsarHandler {
if (!validateOriginalPrincipal(originalPrincipal,
newLookupErrorResponse(ServerError.AuthorizationError,
"Valid Proxy Client role should be provided for lookup ", requestId),
- topicName, "Valid Proxy Client role should be provided for lookup ")) {
+ topicName.toString(), "Valid Proxy Client role should be provided for lookup ")) {
lookupSemaphore.release();
return;
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager()
- .canLookupAsync(DestinationName.get(topicName), authRole);
+ .canLookupAsync(topicName, authRole);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
-
+
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
- lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName),
+ lookupDestinationAsync(getBrokerService().pulsar(), topicName,
lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
@@ -273,10 +279,16 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
final long requestId = partitionMetadata.getRequestId();
- final String topicName = partitionMetadata.getTopic();
if (log.isDebugEnabled()) {
- log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId);
+ log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
+ remoteAddress, requestId);
}
+
+ DestinationName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata);
+ if (topicName == null) {
+ return;
+ }
+
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
@@ -284,22 +296,23 @@ public class ServerCnx extends PulsarHandler {
if (!validateOriginalPrincipal(originalPrincipal,
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
"Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId),
- topicName, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) {
+ topicName.toString(),
+ "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) {
lookupSemaphore.release();
return;
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager()
- .canLookupAsync(DestinationName.get(topicName), authRole);
+ .canLookupAsync(topicName, authRole);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
- getPartitionedTopicMetadata(getBrokerService().pulsar(),
- originalPrincipal != null ? originalPrincipal : authRole,
- DestinationName.get(topicName)).handle((metadata, ex) -> {
+ getPartitionedTopicMetadata(getBrokerService().pulsar(),
+ originalPrincipal != null ? originalPrincipal : authRole, topicName)
+ .handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -446,19 +459,38 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
- final String topicName = subscribe.getTopic();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
+
+ DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
+ if (topicName == null) {
+ return;
+ }
+
if (!validateOriginalPrincipal(originalPrincipal,
Commands.newError(requestId, ServerError.AuthorizationError,
"Valid Proxy Client role should be provided while subscribing "),
- topicName, "Valid Proxy Client role should be provided while subscribing ")) {
+ topicName.toString(), "Valid Proxy Client role should be provided while subscribing ")) {
return;
}
+
+ final String subscriptionName = subscribe.getSubscription();
+ final SubType subType = subscribe.getSubType();
+ final String consumerName = subscribe.getConsumerName();
+ final boolean isDurable = subscribe.getDurable();
+ final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
+ subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
+ subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
+ : null;
+
+ final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
+ final boolean readCompacted = subscribe.getReadCompacted();
+ final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
- authRole, subscribe.getSubscription());
+ isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(topicName, authRole,
+ subscribe.getSubscription());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -466,26 +498,12 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
- DestinationName.get(subscribe.getTopic()),
- originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription());
+ authorizationFuture = service.getAuthorizationManager().canConsumeAsync(topicName,
+ originalPrincipal != null ? originalPrincipal : authRole, subscriptionName);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
- final String subscriptionName = subscribe.getSubscription();
- final SubType subType = subscribe.getSubType();
- final String consumerName = subscribe.getConsumerName();
- final boolean isDurable = subscribe.getDurable();
- final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
- subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
- subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
- : null;
-
- final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
- final boolean readCompacted = subscribe.getReadCompacted();
- final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
-
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
@@ -527,7 +545,7 @@ public class ServerCnx extends PulsarHandler {
}
}
- service.getTopic(topicName)
+ service.getTopic(topicName.toString())
.thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted))
@@ -605,21 +623,29 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
- final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
+ // Use producer name provided by client if present
+ final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
+ : service.generateUniqueProducerName();
+ final boolean isEncrypted = cmdProducer.getEncrypted();
+ final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
+ DestinationName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
+ if (topicName == null) {
+ return;
+ }
if (!validateOriginalPrincipal(originalPrincipal,
Commands.newError(requestId, ServerError.AuthorizationError,
"Valid Proxy Client role should be provided while creating producer "),
- topicName, "Valid Proxy Client role should be provided while creating producer ")) {
+ topicName.toString(), "Valid Proxy Client role should be provided while creating producer ")) {
return;
}
-
+
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
- isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
- authRole);
+ isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(topicName, authRole);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
@@ -627,17 +653,11 @@ public class ServerCnx extends PulsarHandler {
if (isProxyAuthorized) {
CompletableFuture<Boolean> authorizationFuture;
if (service.isAuthorizationEnabled()) {
- authorizationFuture = service.getAuthorizationManager().canProduceAsync(
- DestinationName.get(cmdProducer.getTopic().toString()),
+ authorizationFuture = service.getAuthorizationManager().canProduceAsync(topicName,
originalPrincipal != null ? originalPrincipal : authRole);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
- // Use producer name provided by client if present
- final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
- : service.generateUniqueProducerName();
- final boolean isEncrypted = cmdProducer.getEncrypted();
- final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
@@ -675,7 +695,7 @@ public class ServerCnx extends PulsarHandler {
log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
- service.getTopic(topicName).thenAccept((Topic topic) -> {
+ service.getTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
@@ -704,7 +724,7 @@ public class ServerCnx extends PulsarHandler {
return;
}
- disableTcpNoDelayIfNeeded(topicName, producerName);
+ disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata);
@@ -1130,6 +1150,29 @@ public class ServerCnx extends PulsarHandler {
}
}
+ private DestinationName validateTopicName(String topic, long requestId, GeneratedMessageLite requestCommand) {
+ try {
+ return DestinationName.get(topic);
+ } catch (Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t);
+ }
+
+ if (requestCommand instanceof CommandLookupTopic) {
+ ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
+ "Invalid topic name: " + t.getMessage(), requestId));
+ } else if (requestCommand instanceof CommandPartitionedTopicMetadata) {
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
+ "Invalid topic name: " + t.getMessage(), requestId));
+ } else {
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName,
+ "Invalid topic name: " + t.getMessage()));
+ }
+
+ return null;
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b604cfd..4b895d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,8 +45,6 @@ import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -67,29 +65,28 @@ import org.apache.pulsar.broker.authorization.AuthorizationManager;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.Commands.ChecksumType;
+import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -105,6 +102,9 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
@@ -114,19 +114,19 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
*/
@Test
public class ServerCnxTest {
- private EmbeddedChannel channel;
+ protected EmbeddedChannel channel;
private ServiceConfiguration svcConfig;
private ServerCnx serverCnx;
- private BrokerService brokerService;
+ protected BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ClientChannelHelper clientChannelHelper;
private PulsarService pulsar;
private ConfigurationCacheService configCacheService;
- private NamespaceService namespaceService;
+ protected NamespaceService namespaceService;
private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1]
.getNumber();
- private final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
+ protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
private final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic";
private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
@@ -1334,7 +1334,7 @@ public class ServerCnxTest {
channel.finish();
}
- private void resetChannel() throws Exception {
+ protected void resetChannel() throws Exception {
int MaxMessageSize = 5 * 1024 * 1024;
if (channel != null && channel.isActive()) {
serverCnx.close();
@@ -1345,7 +1345,7 @@ public class ServerCnxTest {
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}
- private void setChannelConnected() throws Exception {
+ protected void setChannelConnected() throws Exception {
Field channelState = ServerCnx.class.getDeclaredField("state");
channelState.setAccessible(true);
channelState.set(serverCnx, State.Connected);
@@ -1358,7 +1358,7 @@ public class ServerCnxTest {
versionField.set(cnx, version);
}
- private Object getResponse() throws Exception {
+ protected Object getResponse() throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
@@ -1460,5 +1460,66 @@ public class ServerCnxTest {
doReturn(successSubName).when(cursorMock).getName();
}
+ @Test(timeOut = 30000)
+ public void testInvalidTopicOnLookup() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ String invalidTopicName = "xx/ass/aa/aaa";
+
+ resetChannel();
+ setChannelConnected();
+
+
+ channel.writeInbound(Commands.newLookup(invalidTopicName, true, 1));
+ Object obj = getResponse();
+ assertEquals(obj.getClass(), CommandLookupTopicResponse.class);
+ CommandLookupTopicResponse res = (CommandLookupTopicResponse) obj;
+ assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testInvalidTopicOnProducer() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ String invalidTopicName = "xx/ass/aa/aaa";
+
+ resetChannel();
+ setChannelConnected();
+
+ ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* producer id */, 1 /* request id */,
+ "prod-name", Collections.emptyMap());
+ channel.writeInbound(clientCommand);
+ Object obj = getResponse();
+ assertEquals(obj.getClass(), CommandError.class);
+ CommandError res = (CommandError) obj;
+ assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void testInvalidTopicOnSubscribe() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ String invalidTopicName = "xx/ass/aa/aaa";
+
+ resetChannel();
+ setChannelConnected();
+
+ channel.writeInbound(Commands.newSubscribe(invalidTopicName, "test-subscription", 1, 1, SubType.Exclusive, 0,
+ "consumerName"));
+ Object obj = getResponse();
+ assertEquals(obj.getClass(), CommandError.class);
+ CommandError res = (CommandError) obj;
+ assertEquals(res.getError(), ServerError.InvalidTopicName);
+
+ channel.finish();
+ }
+
private static final Logger log = LoggerFactory.getLogger(ServerCnxTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 717cd4a..3699f78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
@@ -146,6 +147,11 @@ public class ClientChannelHelper {
protected void handleProducerSuccess(CommandProducerSuccess success) {
queue.offer(CommandProducerSuccess.newBuilder(success).build());
}
+
+ @Override
+ protected void handleLookupResponse(CommandLookupTopicResponse connection) {
+ queue.offer(CommandLookupTopicResponse.newBuilder(connection).build());
+ }
};
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 98c851b..16a3cf0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -71,6 +71,7 @@ public final class PulsarApi {
TooManyRequests(14, 14),
TopicTerminatedError(15, 15),
ProducerBusy(16, 16),
+ InvalidTopicName(17, 17),
;
public static final int UnknownError_VALUE = 0;
@@ -90,6 +91,7 @@ public final class PulsarApi {
public static final int TooManyRequests_VALUE = 14;
public static final int TopicTerminatedError_VALUE = 15;
public static final int ProducerBusy_VALUE = 16;
+ public static final int InvalidTopicName_VALUE = 17;
public final int getNumber() { return value; }
@@ -113,6 +115,7 @@ public final class PulsarApi {
case 14: return TooManyRequests;
case 15: return TopicTerminatedError;
case 16: return ProducerBusy;
+ case 17: return InvalidTopicName;
default: return null;
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 2fac3de..3b87273 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -111,6 +111,7 @@ enum ServerError {
TopicTerminatedError = 15; // The topic has been terminated
ProducerBusy = 16; // Producer with same name is already connected
+ InvalidTopicName = 17; // The topic name is not valid
}
enum AuthMethod {
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.