You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Technoboy- (via GitHub)" <gi...@apache.org> on 2023/10/17 10:10:17 UTC

[PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Technoboy- opened a new pull request, #21379:
URL: https://github.com/apache/pulsar/pull/21379

   ### Motivation
   
   Support subscribing multi/pattern topic for WS.
   The new sub-path could be :
   ```
   /ws/v3/consumer/subscription?topicsPattern="a.*" //ws/v3/consumer/subscription?topics="a,b,c"
   ```
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362058397


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");

Review Comment:
   Should it be `>=5` or `==5`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362111709


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java:
##########
@@ -299,8 +301,7 @@ private void checkResumeReceive() {
 
     private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
-        TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
-                (MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
+        MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));

Review Comment:
   we need to change this, because if we subscribe the multi-topic, the `topic` is not unique.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362123862


##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java:
##########
@@ -1051,5 +1051,70 @@ private void stopWebSocketClient(WebSocketClient... clients) {
         log.info("proxy clients are stopped successfully");
     }
 
+    @Test
+    public void testMultiTopics() throws Exception {
+        final String subscription1 = "my-sub1";
+        final String subscription2 = "my-sub2";
+        final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;
+
+        final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";
+
+        int messages = 10;
+        WebSocketClient consumerClient1 = new WebSocketClient();
+        WebSocketClient consumerClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361918907


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java:
##########
@@ -67,25 +67,29 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final TopicName topic;
+    protected TopicName topic;
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
     protected final ObjectReader consumerCommandReader =
             ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
     private ScheduledFuture<?> pingFuture;
 
+    protected String topicsPattern;
+
+    protected String topics;

Review Comment:
   Plus, This value is only used for the consumer side. We should move it to the consumer handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362118592


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java:
##########
@@ -67,25 +67,29 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final TopicName topic;
+    protected TopicName topic;
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
     protected final ObjectReader consumerCommandReader =
             ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
     private ScheduledFuture<?> pingFuture;
 
+    protected String topicsPattern;

Review Comment:
   fixed



##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java:
##########
@@ -1051,5 +1051,70 @@ private void stopWebSocketClient(WebSocketClient... clients) {
         log.info("proxy clients are stopped successfully");
     }
 
+    @Test
+    public void testMultiTopics() throws Exception {
+        final String subscription1 = "my-sub1";
+        final String subscription2 = "my-sub2";
+        final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;
+
+        final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";
+
+        int messages = 10;
+        WebSocketClient consumerClient1 = new WebSocketClient();
+        WebSocketClient consumerClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic1)
+                .batchingMaxMessages(1)
+                .create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer()

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362010445


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");

Review Comment:
   Should it be` >= 5`



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");

Review Comment:
   Should it be` >= 5` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361904804


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");
+        checkArgument(parts.get(2).equals("v3"));
+        checkArgument(queryParams.containsKey("topicsPattern") || queryParams.containsKey("topics"),
+                "Should set topics or topicsPattern");
+        checkArgument(!(queryParams.containsKey("topicsPattern") && queryParams.containsKey("topics")),
+                "Topics must be null when use topicsPattern");
+        topicsPattern = queryParams.get("topicsPattern");
+        topics = queryParams.get("topics");
+        if (topicsPattern != null) {
+            topic = TopicName.get(topicsPattern);
+        } else {
+            //当有多个topics时,

Review Comment:
   change it to English version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361910279


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java:
##########
@@ -67,25 +67,29 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final TopicName topic;
+    protected TopicName topic;
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
     protected final ObjectReader consumerCommandReader =
             ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
     private ScheduledFuture<?> pingFuture;
 
+    protected String topicsPattern;
+
+    protected String topics;

Review Comment:
   Is it possible to move the `topic` parameter to the `topics` parameter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362003162


##########
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java:
##########
@@ -139,6 +142,27 @@ public void topicNameUrlEncodingTest() throws Exception {
         assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic);
     }
 
+    public String extractSubscription(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // v1 Format must be like :
+        // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        checkArgument(parts.size() == 9, "Invalid topic name format");

Review Comment:
   `>= 9`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#issuecomment-1784458647

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#21379](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (6caf1ec) into [master](https://app.codecov.io/gh/apache/pulsar/commit/bd86e4e9d8dc2fe75ac8e29d628ffc5f7f9aab8c?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (bd86e4e) will **increase** coverage by `38.71%`.
   > The diff coverage is `58.82%`.
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21379/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #21379       +/-   ##
   =============================================
   + Coverage     34.54%   73.25%   +38.71%     
   - Complexity    12084    32609    +20525     
   =============================================
     Files          1713     1892      +179     
     Lines        130782   140444     +9662     
     Branches      14250    15436     +1186     
   =============================================
   + Hits          45173   102878    +57705     
   + Misses        79577    29475    -50102     
   - Partials       6032     8091     +2059     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21379/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21379/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.14% <23.52%> (?)` | |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21379/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.72% <13.23%> (-0.02%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21379/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.54% <55.88%> (+40.67%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...n/java/org/apache/pulsar/broker/PulsarService.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9QdWxzYXJTZXJ2aWNlLmphdmE=) | `83.07% <100.00%> (+15.67%)` | :arrow_up: |
   | [...pache/pulsar/proxy/server/ProxyServiceStarter.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL1Byb3h5U2VydmljZVN0YXJ0ZXIuamF2YQ==) | `66.44% <100.00%> (+34.44%)` | :arrow_up: |
   | [...che/pulsar/websocket/AbstractWebSocketHandler.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXdlYnNvY2tldC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL3dlYnNvY2tldC9BYnN0cmFjdFdlYlNvY2tldEhhbmRsZXIuamF2YQ==) | `57.55% <100.00%> (+57.55%)` | :arrow_up: |
   | [...a/org/apache/pulsar/websocket/ConsumerHandler.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXdlYnNvY2tldC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL3dlYnNvY2tldC9Db25zdW1lckhhbmRsZXIuamF2YQ==) | `63.09% <100.00%> (+63.09%)` | :arrow_up: |
   | [...sar/websocket/service/WebSocketServiceStarter.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXdlYnNvY2tldC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL3dlYnNvY2tldC9zZXJ2aWNlL1dlYlNvY2tldFNlcnZpY2VTdGFydGVyLmphdmE=) | `74.46% <100.00%> (+74.46%)` | :arrow_up: |
   | [.../websocket/WebSocketMultiTopicConsumerServlet.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXdlYnNvY2tldC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL3dlYnNvY2tldC9XZWJTb2NrZXRNdWx0aVRvcGljQ29uc3VtZXJTZXJ2bGV0LmphdmE=) | `88.88% <88.88%> (ø)` | |
   | [...he/pulsar/websocket/MultiTopicConsumerHandler.java](https://app.codecov.io/gh/apache/pulsar/pull/21379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLXdlYnNvY2tldC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL3dlYnNvY2tldC9NdWx0aVRvcGljQ29uc3VtZXJIYW5kbGVyLmphdmE=) | `38.63% <38.63%> (ø)` | |
   
   ... and [1468 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/21379/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362127521


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java:
##########
@@ -67,25 +67,29 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final TopicName topic;
+    protected TopicName topic;
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
     protected final ObjectReader consumerCommandReader =
             ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
     private ScheduledFuture<?> pingFuture;
 
+    protected String topicsPattern;
+
+    protected String topics;

Review Comment:
   > Plus, This value is only used for the consumer side. We should move it to the consumer handler.
   
   fxied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361915236


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1086,6 +1087,11 @@ private void addWebSocketServiceHandler(WebService webService,
                     new ServletHolder(readerWebSocketServlet), true, attributeMap);
             webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet), true, attributeMap);
+
+            final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
+                    new WebSocketMultiTopicConsumerServlet(webSocketService);
+            webService.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,

Review Comment:
   Should be V3? Otherwise, will it override the previous servlet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362003162


##########
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java:
##########
@@ -139,6 +142,27 @@ public void topicNameUrlEncodingTest() throws Exception {
         assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic);
     }
 
+    public String extractSubscription(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // v1 Format must be like :
+        // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        checkArgument(parts.size() == 9, "Invalid topic name format");

Review Comment:
   >= 9?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361915236


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1086,6 +1087,11 @@ private void addWebSocketServiceHandler(WebService webService,
                     new ServletHolder(readerWebSocketServlet), true, attributeMap);
             webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet), true, attributeMap);
+
+            final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
+                    new WebSocketMultiTopicConsumerServlet(webSocketService);
+            webService.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,

Review Comment:
   Should be V3? Otherwise, will it override the previous servlet?



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java:
##########
@@ -316,6 +317,11 @@ public static void addWebServerHandlers(WebServer server,
                     new ServletHolder(readerWebSocketServlet));
             server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet));
+
+            final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet
+                    = new WebSocketMultiTopicConsumerServlet(webSocketService);
+            server.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,

Review Comment:
   Should be V3? Otherwise, will it override the previous servlet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361914985


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java:
##########
@@ -316,6 +317,11 @@ public static void addWebServerHandlers(WebServer server,
                     new ServletHolder(readerWebSocketServlet));
             server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet));
+
+            final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet
+                    = new WebSocketMultiTopicConsumerServlet(webSocketService);
+            server.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,

Review Comment:
   Should be V3? Otherwise, will it override the previous servlet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361920762


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java:
##########
@@ -299,8 +301,7 @@ private void checkResumeReceive() {
 
     private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
-        TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
-                (MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
+        MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));

Review Comment:
   WE don't need to change it, isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362058397


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");

Review Comment:
   Should it be `>=5` or `==5`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362013669


##########
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java:
##########
@@ -139,6 +142,27 @@ public void topicNameUrlEncodingTest() throws Exception {
         assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic);
     }
 
+    public String extractSubscription(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // v1 Format must be like :
+        // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        checkArgument(parts.size() == 9, "Invalid topic name format");

Review Comment:
   Should it be `>=`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #21379:
URL: https://github.com/apache/pulsar/pull/21379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362003162


##########
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java:
##########
@@ -139,6 +142,27 @@ public void topicNameUrlEncodingTest() throws Exception {
         assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic);
     }
 
+    public String extractSubscription(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // v1 Format must be like :
+        // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        checkArgument(parts.size() == 9, "Invalid topic name format");

Review Comment:
   `>= 9`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#issuecomment-1766353858

   Please help fix the checkstyle issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361916712


##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java:
##########
@@ -1051,5 +1051,70 @@ private void stopWebSocketClient(WebSocketClient... clients) {
         log.info("proxy clients are stopped successfully");
     }
 
+    @Test
+    public void testMultiTopics() throws Exception {
+        final String subscription1 = "my-sub1";
+        final String subscription2 = "my-sub2";
+        final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;
+
+        final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";
+
+        int messages = 10;
+        WebSocketClient consumerClient1 = new WebSocketClient();
+        WebSocketClient consumerClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()

Review Comment:
   Should we close it?



##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java:
##########
@@ -1051,5 +1051,70 @@ private void stopWebSocketClient(WebSocketClient... clients) {
         log.info("proxy clients are stopped successfully");
     }
 
+    @Test
+    public void testMultiTopics() throws Exception {
+        final String subscription1 = "my-sub1";
+        final String subscription2 = "my-sub2";
+        final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
+        final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;
+
+        final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+                "/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";
+
+        int messages = 10;
+        WebSocketClient consumerClient1 = new WebSocketClient();
+        WebSocketClient consumerClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic1)
+                .batchingMaxMessages(1)
+                .create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer()

Review Comment:
   Should we close it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361918675


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java:
##########
@@ -67,25 +67,29 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final TopicName topic;
+    protected TopicName topic;
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
     protected final ObjectReader consumerCommandReader =
             ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
     private ScheduledFuture<?> pingFuture;
 
+    protected String topicsPattern;

Review Comment:
   This value is only used for the consumer side. We should move it to the consumer handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1361928559


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.

Review Comment:
   Please mention the difference with the consumer handler. We don't need to copy the description directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362010445


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Splitter;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+
+/**
+ *
+ * WebSocket end-point url handler to handle incoming receive and acknowledge requests.
+ * <p>
+ * <b>receive:</b> socket-proxy keeps pushing messages to client by writing into session. However, it dispatches N
+ * messages at any point and after that on acknowledgement from client it dispatches further messages. <br/>
+ * <b>acknowledge:</b> it accepts acknowledgement for a given message from client and send it to broker. and for next
+ * action it notifies receive to dispatch further messages to client.
+ * </P>
+ *
+ */
+public class MultiTopicConsumerHandler extends ConsumerHandler {
+
+    public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request,
+                                     ServletUpgradeResponse response) {
+        super(service, request, response);
+    }
+
+    @Override
+    protected void extractTopicName(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // V3 Format must be like :
+        // /ws/v3/consumer/my-subscription?topicsPattern="a.*"  //ws/v3/consumer/my-subscription?topics="a,b,c"
+        checkArgument(parts.size() >= 4, "Invalid topic name format");

Review Comment:
   Should it be` >= 5` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve][ws] Support subscribing multi/pattern topic for Websocket [pulsar]

Posted by "mattisonchao (via GitHub)" <gi...@apache.org>.
mattisonchao commented on code in PR #21379:
URL: https://github.com/apache/pulsar/pull/21379#discussion_r1362013669


##########
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java:
##########
@@ -139,6 +142,27 @@ public void topicNameUrlEncodingTest() throws Exception {
         assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic);
     }
 
+    public String extractSubscription(HttpServletRequest request) {
+        String uri = request.getRequestURI();
+        List<String> parts = Splitter.on("/").splitToList(uri);
+
+        // v1 Format must be like :
+        // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        checkArgument(parts.size() == 9, "Invalid topic name format");

Review Comment:
   Should it be `>=`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org