You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/02 09:24:04 UTC
[pulsar] branch branch-2.7 updated: [WebSocket] Fix the initial
sequence id error (#8724) (#8743)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 2fa9b1f [WebSocket] Fix the initial sequence id error (#8724) (#8743)
2fa9b1f is described below
commit 2fa9b1f5f1d35fe30e980cdab606e14b80a398de
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 2 17:23:48 2020 +0800
[WebSocket] Fix the initial sequence id error (#8724) (#8743)
### Motivation
The WebSocket `ProducerHandler` parse a wrong string so the client's query param `initialSequenceId` will always fail.
### Modifications
- Parse the real `initialSequenceId` value.
- Add unit tests to ensure that all params from the WebSocket url can be converted to the right config.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
(cherry picked from commit be39f502de0f5055c6db1b9d3688abd25d9ccc0f)
Co-authored-by: Yunze Xu <xy...@163.com>
---
.../pulsar/websocket/AbstractWebSocketHandler.java | 2 +-
.../apache/pulsar/websocket/ConsumerHandler.java | 2 +-
.../apache/pulsar/websocket/ProducerHandler.java | 4 +-
.../websocket/AbstractWebSocketHandlerTest.java | 188 +++++++++++++++++++++
4 files changed, 192 insertions(+), 4 deletions(-)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index d29447e..6002f63 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -64,7 +64,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected boolean checkAuth(ServletUpgradeResponse response) {
String authRole = "<none>";
- AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
if (service.isAuthenticationEnabled()) {
try {
authRole = service.getAuthenticationService().authenticateHttpRequest(request);
@@ -85,6 +84,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
}
if (service.isAuthorizationEnabled()) {
+ AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
try {
if (!isAuthorized(authRole, authenticationData)) {
log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(),
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index c4b6707..5faf6d2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -330,7 +330,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
numBytesDelivered.add(msgSize);
}
- private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
+ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
ConsumerBuilder<byte[]> builder = client.newConsumer();
if (queryParams.containsKey("ackTimeoutMillis")) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 6ecf07e..5a6a41d 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -285,7 +285,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
}
- private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+ protected ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
ProducerBuilder<byte[]> builder = client.newProducer()
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition);
@@ -298,7 +298,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
}
if (queryParams.containsKey("initialSequenceId")) {
- builder.initialSequenceId(Long.parseLong("initialSequenceId"));
+ builder.initialSequenceId(Long.parseLong(queryParams.get("initialSequenceId")));
}
if (queryParams.containsKey("hashingScheme")) {
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index afdd7b9..87ae7e8 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -18,7 +18,19 @@
*/
package org.apache.pulsar.websocket;
+import lombok.Getter;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.junit.Assert;
@@ -26,12 +38,20 @@ import org.junit.Test;
import org.mockito.Mock;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class AbstractWebSocketHandlerTest {
@@ -171,4 +191,172 @@ public class AbstractWebSocketHandlerTest {
}
+ class MockedServletUpgradeResponse extends ServletUpgradeResponse {
+
+ @Getter
+ private int statusCode;
+ @Getter
+ private String message;
+
+ public MockedServletUpgradeResponse(HttpServletResponse response) {
+ super(response);
+ }
+
+ public void sendError(int statusCode, String message) {
+ this.statusCode = statusCode;
+ this.message = message;
+ }
+ }
+
+ PulsarClient newPulsarClient() throws PulsarClientException {
+ return PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .operationTimeout(1, TimeUnit.SECONDS)
+ .build();
+ }
+
+ class MockedProducerHandler extends ProducerHandler {
+
+ public MockedProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
+ super(service, request, response);
+ }
+
+ public ProducerConfigurationData getConf() throws PulsarClientException {
+ return ((ProducerBuilderImpl<byte[]>) getProducerBuilder(newPulsarClient())).getConf();
+ }
+
+ public void clearQueryParams() {
+ queryParams.clear();
+ }
+
+ public void putQueryParam(String key, String value) {
+ queryParams.put(key, value);
+ }
+ }
+
+ @Test
+ public void producerBuilderTest() throws IOException {
+ String producerV2 = "/ws/v2/producer/persistent/my-property/my-ns/my-topic";
+ // the params are all different with the default value
+ Map<String, String[]> queryParams = new HashMap<String, String>(){{
+ put("producerName", "my-producer");
+ put("initialSequenceId", "1");
+ put("hashingScheme", "Murmur3_32Hash");
+ put("sendTimeoutMillis", "30001");
+ put("batchingEnabled", "false");
+ put("batchingMaxMessages", "1001");
+ put("maxPendingMessages", "1001");
+ put("batchingMaxPublishDelay", "2");
+ put("messageRoutingMode", "RoundRobinPartition");
+ put("compressionType", "LZ4");
+ }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new String[]{ entry.getValue() }));
+
+ httpServletRequest = mock(HttpServletRequest.class);
+ when(httpServletRequest.getRequestURI()).thenReturn(producerV2);
+ when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+ WebSocketService service = mock(WebSocketService.class);
+ when(service.isAuthenticationEnabled()).thenReturn(false);
+ when(service.isAuthorizationEnabled()).thenReturn(false);
+ when(service.getPulsarClient()).thenReturn(newPulsarClient());
+
+ MockedServletUpgradeResponse response = new MockedServletUpgradeResponse(null);
+
+ MockedProducerHandler producerHandler = new MockedProducerHandler(service, httpServletRequest, response);
+ assertEquals(response.getStatusCode(), 500);
+ assertTrue(response.getMessage().contains("Connection refused"));
+
+ ProducerConfigurationData conf = producerHandler.getConf();
+ assertEquals(conf.getProducerName(), "my-producer");
+ assertEquals(conf.getInitialSequenceId().longValue(), 1L);
+ assertEquals(conf.getHashingScheme(), HashingScheme.Murmur3_32Hash);
+ assertEquals(conf.getSendTimeoutMs(), 30001);
+ assertFalse(conf.isBatchingEnabled() );
+ assertEquals(conf.getBatchingMaxMessages(), 1001);
+ assertEquals(conf.getMaxPendingMessages(), 1001);
+ assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.RoundRobinPartition);
+ assertEquals(conf.getCompressionType(), CompressionType.LZ4);
+
+ producerHandler.clearQueryParams();
+ conf = producerHandler.getConf();
+ // The default message routing mode is SinglePartition, which is different with ProducerBuilder
+ assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.SinglePartition);
+
+ producerHandler.putQueryParam("messageRoutingMode", "CustomPartition");
+ conf = producerHandler.getConf();
+ // ProducerHandler doesn't support CustomPartition
+ assertEquals(conf.getMessageRoutingMode(), MessageRoutingMode.SinglePartition);
+ }
+
+ class MockedConsumerHandler extends ConsumerHandler {
+
+ public MockedConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
+ super(service, request, response);
+ }
+
+ public ConsumerConfigurationData<byte[]> getConf() throws PulsarClientException {
+ return ((ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(newPulsarClient())).getConf();
+ }
+
+ public void clearQueryParams() {
+ queryParams.clear();
+ }
+
+ public void putQueryParam(String key, String value) {
+ queryParams.put(key, value);
+ }
+ }
+
+ @Test
+ public void consumerBuilderTest() throws IOException {
+ String consumerV2 = "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+ // the params are all different with the default value
+ Map<String, String[]> queryParams = new HashMap<String, String>(){{
+ put("ackTimeoutMillis", "1001");
+ put("subscriptionType", "Key_Shared");
+ put("subscriptionMode", "NonDurable");
+ put("receiverQueueSize", "999");
+ put("consumerName", "my-consumer");
+ put("priorityLevel", "1");
+ put("maxRedeliverCount", "5");
+ }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new String[]{ entry.getValue() }));
+
+ httpServletRequest = mock(HttpServletRequest.class);
+ when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+ when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+ WebSocketService service = mock(WebSocketService.class);
+ when(service.isAuthenticationEnabled()).thenReturn(false);
+ when(service.isAuthorizationEnabled()).thenReturn(false);
+ when(service.getPulsarClient()).thenReturn(newPulsarClient());
+
+ MockedServletUpgradeResponse response = new MockedServletUpgradeResponse(null);
+
+ MockedConsumerHandler consumerHandler = new MockedConsumerHandler(service, httpServletRequest, response);
+ assertEquals(response.getStatusCode(), 500);
+ assertTrue(response.getMessage().contains("Connection refused"));
+ assertEquals(consumerHandler.getSubscriptionMode(), SubscriptionMode.NonDurable);
+ assertEquals(consumerHandler.getSubscriptionType(), SubscriptionType.Key_Shared);
+
+ ConsumerConfigurationData<byte[]> conf = consumerHandler.getConf();
+ assertEquals(conf.getAckTimeoutMillis(), 1001);
+ assertEquals(conf.getSubscriptionType(), SubscriptionType.Key_Shared);
+ assertEquals(conf.getSubscriptionMode(), SubscriptionMode.NonDurable);
+ assertEquals(conf.getReceiverQueueSize(), 999);
+ assertEquals(conf.getConsumerName(), "my-consumer");
+ assertEquals(conf.getPriorityLevel(), 1);
+ assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(),
+ "persistent://my-property/my-ns/my-topic-my-subscription-DLQ");
+ assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 5);
+
+ consumerHandler.clearQueryParams();
+ consumerHandler.putQueryParam("receiverQueueSize", "1001");
+ consumerHandler.putQueryParam("deadLetterTopic", "dead-letter-topic");
+
+ conf = consumerHandler.getConf();
+ // receive queue size is the minimum value of default value (1000) and user defined value(1001)
+ assertEquals(conf.getReceiverQueueSize(), 1000);
+ assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), "dead-letter-topic");
+ assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 0);
+ }
}