You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/02 09:24:04 UTC

[pulsar] branch branch-2.7 updated: [WebSocket] Fix the initial sequence id error (#8724) (#8743)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 2fa9b1f  [WebSocket] Fix the initial sequence id error (#8724) (#8743)
2fa9b1f is described below

commit 2fa9b1f5f1d35fe30e980cdab606e14b80a398de
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 2 17:23:48 2020 +0800

    [WebSocket] Fix the initial sequence id error (#8724) (#8743)
    
    ### Motivation
    
    The WebSocket `ProducerHandler` parse a wrong string so the client's query param `initialSequenceId` will always fail.
    
    ### Modifications
    
    - Parse the real `initialSequenceId` value.
    - Add unit tests to ensure that all params from the WebSocket url can be converted to the right config.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit be39f502de0f5055c6db1b9d3688abd25d9ccc0f)
    
    Co-authored-by: Yunze Xu <xy...@163.com>
---
 .../pulsar/websocket/AbstractWebSocketHandler.java |   2 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |   2 +-
 .../apache/pulsar/websocket/ProducerHandler.java   |   4 +-
 .../websocket/AbstractWebSocketHandlerTest.java    | 188 +++++++++++++++++++++
 4 files changed, 192 insertions(+), 4 deletions(-)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index d29447e..6002f63 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -64,7 +64,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
 
     protected boolean checkAuth(ServletUpgradeResponse response) {
         String authRole = "<none>";
-        AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
         if (service.isAuthenticationEnabled()) {
             try {
                 authRole = service.getAuthenticationService().authenticateHttpRequest(request);
@@ -85,6 +84,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
         }
 
         if (service.isAuthorizationEnabled()) {
+            AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
             try {
                 if (!isAuthorized(authRole, authenticationData)) {
                     log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(),
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index c4b6707..5faf6d2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -330,7 +330,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         numBytesDelivered.add(msgSize);
     }
 
-    private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
+    protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
         ConsumerBuilder<byte[]> builder = client.newConsumer();
 
         if (queryParams.containsKey("ackTimeoutMillis")) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 6ecf07e..5a6a41d 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -285,7 +285,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
     }
 
-    private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+    protected ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
         ProducerBuilder<byte[]> builder = client.newProducer()
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition);
@@ -298,7 +298,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         }
 
         if (queryParams.containsKey("initialSequenceId")) {
-            builder.initialSequenceId(Long.parseLong("initialSequenceId"));
+            builder.initialSequenceId(Long.parseLong(queryParams.get("initialSequenceId")));
         }
 
         if (queryParams.containsKey("hashingScheme")) {
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index afdd7b9..87ae7e8 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -18,7 +18,19 @@
  */
 package org.apache.pulsar.websocket;
 
+import lombok.Getter;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.junit.Assert;
@@ -26,12 +38,20 @@ import org.junit.Test;
 import org.mockito.Mock;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
 public class AbstractWebSocketHandlerTest {
@@ -171,4 +191,172 @@ public class AbstractWebSocketHandlerTest {
 
     }
 
+    class MockedServletUpgradeResponse extends ServletUpgradeResponse {
+
+        @Getter
+        private int statusCode;
+        @Getter
+        private String message;
+
+        public MockedServletUpgradeResponse(HttpServletResponse response) {
+            super(response);
+        }
+
+        public void sendError(int statusCode, String message) {
+            this.statusCode = statusCode;
+            this.message = message;
+        }
+    }
+
+    PulsarClient newPulsarClient() throws PulsarClientException {
+        return PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:6650")
+                .operationTimeout(1, TimeUnit.SECONDS)
+                .build();
+    }
+
+    class MockedProducerHandler extends ProducerHandler {
+
+        public MockedProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
+            super(service, request, response);
+        }
+
+        public ProducerConfigurationData getConf() throws PulsarClientException {
+            return ((ProducerBuilderImpl<byte[]>) getProducerBuilder(newPulsarClient())).getConf();
+        }
+
+        public void clearQueryParams() {
+            queryParams.clear();
+        }
+
+        public void putQueryParam(String key, String value) {
+            queryParams.put(key, value);
+        }
+    }
+
+    @Test
+    public void producerBuilderTest() throws IOException {
+        String producerV2 = "/ws/v2/producer/persistent/my-property/my-ns/my-topic";
+        // the params are all different with the default value
+        Map<String, String[]> queryParams = new HashMap<String, String>(){{
+            put("producerName", "my-producer");
+            put("initialSequenceId", "1");
+            put("hashingScheme", "Murmur3_32Hash");
+            put("sendTimeoutMillis", "30001");
+            put("batchingEnabled", "false");
+            put("batchingMaxMessages", "1001");
+            put("maxPendingMessages", "1001");
+            put("batchingMaxPublishDelay", "2");
+            put("messageRoutingMode", "RoundRobinPartition");
+            put("compressionType", "LZ4");
+        }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new String[]{ entry.getValue() }));
+
+        httpServletRequest = mock(HttpServletRequest.class);
+        when(httpServletRequest.getRequestURI()).thenReturn(producerV2);
+        when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+        WebSocketService service = mock(WebSocketService.class);
+        when(service.isAuthenticationEnabled()).thenReturn(false);
+        when(service.isAuthorizationEnabled()).thenReturn(false);
+        when(service.getPulsarClient()).thenReturn(newPulsarClient());
+
+        MockedServletUpgradeResponse response = new MockedServletUpgradeResponse(null);
+
+        MockedProducerHandler producerHandler = new MockedProducerHandler(service, httpServletRequest, response);
+        assertEquals(response.getStatusCode(), 500);
+        assertTrue(response.getMessage().contains("Connection refused"));
+
+        ProducerConfigurationData conf = producerHandler.getConf();
+        assertEquals(conf.getProducerName(), "my-producer");
+        assertEquals(conf.getInitialSequenceId().longValue(), 1L);
+        assertEquals(conf.getHashingScheme(), HashingScheme.Murmur3_32Hash);
+        assertEquals(conf.getSendTimeoutMs(), 30001);
+        assertFalse(conf.isBatchingEnabled() );
+        assertEquals(conf.getBatchingMaxMessages(), 1001);
+        assertEquals(conf.getMaxPendingMessages(), 1001);
+        assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.RoundRobinPartition);
+        assertEquals(conf.getCompressionType(), CompressionType.LZ4);
+
+        producerHandler.clearQueryParams();
+        conf = producerHandler.getConf();
+        // The default message routing mode is SinglePartition, which is different with ProducerBuilder
+        assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.SinglePartition);
+
+        producerHandler.putQueryParam("messageRoutingMode", "CustomPartition");
+        conf = producerHandler.getConf();
+        // ProducerHandler doesn't support CustomPartition
+        assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.SinglePartition);
+    }
+
+    class MockedConsumerHandler extends ConsumerHandler {
+
+        public MockedConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
+            super(service, request, response);
+        }
+
+        public ConsumerConfigurationData<byte[]> getConf() throws PulsarClientException {
+            return ((ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(newPulsarClient())).getConf();
+        }
+
+        public void clearQueryParams() {
+            queryParams.clear();
+        }
+
+        public void putQueryParam(String key, String value) {
+            queryParams.put(key, value);
+        }
+    }
+
+    @Test
+    public void consumerBuilderTest() throws IOException {
+        String consumerV2 = "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+        // the params are all different with the default value
+        Map<String, String[]> queryParams = new HashMap<String, String>(){{
+            put("ackTimeoutMillis", "1001");
+            put("subscriptionType", "Key_Shared");
+            put("subscriptionMode", "NonDurable");
+            put("receiverQueueSize", "999");
+            put("consumerName", "my-consumer");
+            put("priorityLevel", "1");
+            put("maxRedeliverCount", "5");
+        }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new String[]{ entry.getValue() }));
+
+        httpServletRequest = mock(HttpServletRequest.class);
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+        when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+        WebSocketService service = mock(WebSocketService.class);
+        when(service.isAuthenticationEnabled()).thenReturn(false);
+        when(service.isAuthorizationEnabled()).thenReturn(false);
+        when(service.getPulsarClient()).thenReturn(newPulsarClient());
+
+        MockedServletUpgradeResponse response = new MockedServletUpgradeResponse(null);
+
+        MockedConsumerHandler consumerHandler = new MockedConsumerHandler(service, httpServletRequest, response);
+        assertEquals(response.getStatusCode(), 500);
+        assertTrue(response.getMessage().contains("Connection refused"));
+        assertEquals(consumerHandler.getSubscriptionMode(), SubscriptionMode.NonDurable);
+        assertEquals(consumerHandler.getSubscriptionType(), SubscriptionType.Key_Shared);
+
+        ConsumerConfigurationData<byte[]> conf = consumerHandler.getConf();
+        assertEquals(conf.getAckTimeoutMillis(), 1001);
+        assertEquals(conf.getSubscriptionType(), SubscriptionType.Key_Shared);
+        assertEquals(conf.getSubscriptionMode(), SubscriptionMode.NonDurable);
+        assertEquals(conf.getReceiverQueueSize(), 999);
+        assertEquals(conf.getConsumerName(), "my-consumer");
+        assertEquals(conf.getPriorityLevel(), 1);
+        assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(),
+                "persistent://my-property/my-ns/my-topic-my-subscription-DLQ");
+        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 5);
+
+        consumerHandler.clearQueryParams();
+        consumerHandler.putQueryParam("receiverQueueSize", "1001");
+        consumerHandler.putQueryParam("deadLetterTopic", "dead-letter-topic");
+
+        conf = consumerHandler.getConf();
+        // receive queue size is the minimum value of default value (1000) and user defined value(1001)
+        assertEquals(conf.getReceiverQueueSize(), 1000);
+        assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), "dead-letter-topic");
+        assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 0);
+    }
 }