You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/30 00:06:10 UTC

[GitHub] merlimat opened a new issue #1142: Broker should respond with error when invalid topic name is used

merlimat opened a new issue #1142: Broker should respond with error when invalid topic name is used
URL: https://github.com/apache/incubator-pulsar/issues/1142
 
 
   #### Expected behavior
   
   Broker should respond with error when invalid topic name is used instead of getting exception and forcefully close the connection.
   
   Eg: 
   
   ```
   2018-01-29 16:42:18,530 - WARN  - [pulsar-io-71-1:ServerCnx@163] - [/127.0.0.1:63891] Got exception: Invalid destination name: bad-topic-name -- Domain is missing
   java.lang.IllegalArgumentException: Invalid destination name: bad-topic-name -- Domain is missing
       at org.apache.pulsar.common.naming.DestinationName.<init>(DestinationName.java:96)
       at org.apache.pulsar.common.naming.DestinationName.<init>(DestinationName.java:40)
       at org.apache.pulsar.common.naming.DestinationName$1.load(DestinationName.java:62)
       at org.apache.pulsar.common.naming.DestinationName$1.load(DestinationName.java:59)
       at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
       at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
       at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
       at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
       at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
       at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
       at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
       at org.apache.pulsar.common.naming.DestinationName.get(DestinationName.java:74)
       at org.apache.pulsar.broker.service.ServerCnx.handleLookup(ServerCnx.java:180)
       at org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:106)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
       at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
       at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
       at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
       at java.lang.Thread.run(Thread.java:748)
   2018-01-29 16:42:18,533 - INFO  - [pulsar-io-71-1:ServerCnx@128] - Closed connection from /127.0.0.1:63891
   ```
   
   This should be applied to topic lookup, partitions metadata and producer/consumer/reader creation. 
   
   Example of fix could be: 
   
   ```diff
   diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
   index bd49fc76..035ab34f 100644
   --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
   +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
   @@ -191,9 +191,22 @@ public class ServerCnx extends PulsarHandler {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
            }
   +
   +        DestinationName topicName;
   +        try {
   +            topicName = DestinationName.get(topic);
   +        } catch (Throwable t) {
   +            if (log.isDebugEnabled()) {
   +                log.debug("[{}] Failed to parse topic name '{}'", remoteAddress, topic, t);
   +            }
   +            ctx.writeAndFlush(newLookupErrorResponse(ServerError.TopicNotFound, "Invalid topic name: " + t.getMessage(),
   +                    requestId));
   +            return;
   +        }
   +
            final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
            if (lookupSemaphore.tryAcquire()) {
   -            lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
   +            lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(),
                        getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
                            if (ex == null) {
                                ctx.writeAndFlush(lookupResponse);
   
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services