You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/04/02 00:54:52 UTC

[pulsar] branch master updated: Issue #6619: WebSocket Topic URL Parser is Incorrect (#6630)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11eb86c  Issue #6619: WebSocket Topic URL Parser is Incorrect (#6630)
11eb86c is described below

commit 11eb86c3a9abdd5149a0a54582cd85c63bfb5cf3
Author: feynmanlin <31...@qq.com>
AuthorDate: Thu Apr 2 08:54:37 2020 +0800

    Issue #6619: WebSocket Topic URL Parser is Incorrect (#6630)
    
    fix #6619
    
    * delete useless code
    
    * fix #6619
    
    * add license
---
 .../pulsar/websocket/AbstractWebSocketHandler.java |  38 ++++---
 .../websocket/AbstractWebSocketHandlerTest.java    | 111 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 14 deletions(-)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 2cb31af..e9cb3b6 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -18,18 +18,8 @@
  */
 package org.apache.pulsar.websocket;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.naming.AuthenticationException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -40,7 +30,16 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Splitter;
+import javax.naming.AuthenticationException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
 
 public abstract class AbstractWebSocketHandler extends WebSocketAdapter implements Closeable {
 
@@ -181,7 +180,18 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
         final String domain = parts.get(domainIndex);
         final NamespaceName namespace = isV2Format ? NamespaceName.get(parts.get(5), parts.get(6)) :
                 NamespaceName.get( parts.get(4), parts.get(5), parts.get(6));
-        final String name = parts.get(7);
+        //The topic name which contains slashes is also split , so it needs to be jointed
+        int startPosition = 7;
+        boolean isConsumer = "consumer".equals(parts.get(2)) || "consumer".equals(parts.get(3));
+        int endPosition = isConsumer ? parts.size() -1 : parts.size();
+        StringBuilder topicName = new StringBuilder(parts.get(startPosition));
+        while (++startPosition < endPosition) {
+            if(StringUtils.isEmpty(parts.get(startPosition))){
+               continue;
+            }
+            topicName.append("/").append(parts.get(startPosition));
+        }
+        final String name = topicName.toString();
 
         return TopicName.get(domain, namespace, name);
     }
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
new file mode 100644
index 0000000..e6619e0
--- /dev/null
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.TopicName;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class AbstractWebSocketHandlerTest {
+    @Mock
+    private HttpServletRequest httpServletRequest;
+
+    @Test
+    public void parseTopicNameTest() {
+        String producerV1 = "/ws/producer/persistent/my-property/my-cluster/my-ns/my-topic";
+        String consumerV1 = "/ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription";
+        String readerV1 = "/ws/reader/persistent/my-property/my-cluster/my-ns/my-topic";
+
+        String producerV2 = "/ws/v2/producer/persistent/my-property/my-ns/my-topic";
+        String consumerV2 = "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+        String consumerLongTopicNameV2 = "/ws/v2/consumer/persistent/my-tenant/my-ns/some/topic/with/slashes/my-sub";
+        String readerV2 = "/ws/v2/reader/persistent/my-property/my-ns/my-topic/ / /@!$#^&*( /)1 /_+、`,《》</>";
+
+        httpServletRequest = mock(HttpServletRequest.class);
+
+        when(httpServletRequest.getRequestURI()).thenReturn(producerV1);
+        WebSocketHandlerImpl webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        TopicName topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-cluster/my-ns/my-topic", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV1);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-cluster/my-ns/my-topic", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(readerV1);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-cluster/my-ns/my-topic", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(producerV2);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-ns/my-topic", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-ns/my-topic", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerLongTopicNameV2);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-tenant/my-ns/some/topic/with/slashes", topicName.toString());
+
+        when(httpServletRequest.getRequestURI()).thenReturn(readerV2);
+        webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null);
+        topicName = webSocketHandler.getTopic();
+        Assert.assertEquals("persistent://my-property/my-ns/my-topic/ / /@!$#^&*( /)1 /_+、`,《》</>", topicName.toString());
+
+    }
+
+    class WebSocketHandlerImpl extends AbstractWebSocketHandler {
+
+        public WebSocketHandlerImpl(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
+            super(service, request, response);
+        }
+
+        @Override
+        protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        public TopicName getTopic() {
+            return super.topic;
+        }
+
+    }
+
+}