You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/24 00:50:32 UTC

[incubator-pulsar] branch master updated: Add v2 support to the websockets endpoints. (#1429)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d11ae4d  Add v2 support to the websockets endpoints. (#1429)
d11ae4d is described below

commit d11ae4d64451eeeef1125d7d6d32d8c2c0ab1445
Author: cckellogg <cc...@gmail.com>
AuthorDate: Fri Mar 23 17:50:30 2018 -0700

    Add v2 support to the websockets endpoints. (#1429)
    
    * Add v2 support to the websockets endpoints.
    
    * Add servlet mounts for v2 websockets.
    
    * Add default receiver queue size.
---
 .../org/apache/pulsar/broker/PulsarService.java    | 19 ++++++--
 .../pulsar/websocket/AbstractWebSocketHandler.java | 31 ++++++++++---
 .../apache/pulsar/websocket/ConsumerHandler.java   | 18 +++++---
 .../apache/pulsar/websocket/ProducerHandler.java   |  5 +--
 .../org/apache/pulsar/websocket/ReaderHandler.java | 52 ++++++++++++----------
 .../pulsar/websocket/WebSocketConsumerServlet.java |  1 +
 .../pulsar/websocket/WebSocketProducerServlet.java |  1 +
 .../pulsar/websocket/WebSocketReaderServlet.java   |  1 +
 8 files changed, 87 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 51014a7..98a4e08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.zookeeper.ZooKeeper;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -310,12 +311,24 @@ public class PulsarService implements AutoCloseable {
                         new ClusterData(webServiceAddress, webServiceAddressTls, brokerServiceUrl, brokerServiceUrlTls),
                         config);
                 this.webSocketService.start();
+
+                final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
                 this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
-                        new ServletHolder(new WebSocketProducerServlet(webSocketService)), true, attributeMap);
+                        new ServletHolder(producerWebSocketServlet), true, attributeMap);
+                this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
+                        new ServletHolder(producerWebSocketServlet), true, attributeMap);
+
+                final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
                 this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
-                        new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true, attributeMap);
+                        new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+                this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
+                        new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+
+                final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
                 this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH,
-                        new ServletHolder(new WebSocketReaderServlet(webSocketService)), true, attributeMap);
+                        new ServletHolder(readerWebSocketServlet), true, attributeMap);
+                this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
+                        new ServletHolder(readerWebSocketServlet), true, attributeMap);
             }
 
             if (LOG.isDebugEnabled()) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 97f73d2..7e06775 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -32,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -46,9 +47,10 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
     protected final WebSocketService service;
     protected final HttpServletRequest request;
 
-    protected final String topic;
+    protected final TopicName topic;
     protected final Map<String, String> queryParams;
 
+
     public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
         this.service = service;
         this.request = request;
@@ -149,22 +151,39 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
         return null;
     }
 
-    private String extractTopicName(HttpServletRequest request) {
+    private TopicName extractTopicName(HttpServletRequest request) {
         String uri = request.getRequestURI();
         List<String> parts = Splitter.on("/").splitToList(uri);
 
-        // Format must be like :
+        // V1 Format must be like :
         // /ws/producer/persistent/my-property/my-cluster/my-ns/my-topic
         // or
         // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
         // or
         // /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic
+
+        // V2 Format must be like :
+        // /ws/v2/producer/persistent/my-property/my-ns/my-topic
+        // or
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+        // or
+        // /ws/v2/reader/persistent/my-property/my-ns/my-topic
+
         checkArgument(parts.size() >= 8, "Invalid topic name format");
         checkArgument(parts.get(1).equals("ws"));
-        checkArgument(parts.get(3).equals("persistent") || parts.get(3).equals("non-persistent"));
 
-        TopicName topicName = TopicName.get(parts.get(3), parts.get(4), parts.get(5), parts.get(6), parts.get(7));
-        return topicName.toString();
+        final boolean isV2Format = parts.get(2).equals("v2");
+        final int domainIndex = isV2Format ? 4 : 3;
+        checkArgument(parts.get(domainIndex).equals("persistent") ||
+                parts.get(domainIndex).equals("non-persistent"));
+
+
+        final String domain = parts.get(domainIndex);
+        final NamespaceName namespace = isV2Format ? NamespaceName.get(parts.get(5), parts.get(6)) :
+                NamespaceName.get( parts.get(4), parts.get(5), parts.get(6));
+        final String name = parts.get(7);
+
+        return TopicName.get(domain, namespace, name);
     }
 
     protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 8b0f573..aea88f3 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,10 +35,9 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -91,7 +90,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
                 return;
             }
 
-            this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
+            this.consumer = service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
             if (!this.service.addConsumer(this)) {
                 log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -298,7 +297,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
 
     @Override
     protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
-        return service.getAuthorizationService().canConsume(TopicName.get(topic), authRole, authenticationData,
+        return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
                 this.subscription);
     }
 
@@ -306,11 +305,18 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         String uri = request.getRequestURI();
         List<String> parts = Splitter.on("/").splitToList(uri);
 
-        // Format must be like :
+        // v1 Format must be like :
         // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+        // v2 Format must be like :
+        // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
         checkArgument(parts.size() == 9, "Invalid topic name format");
         checkArgument(parts.get(1).equals("ws"));
-        checkArgument(parts.get(3).equals("persistent")|| parts.get(3).equals("non-persistent"));
+
+        final boolean isV2Format = parts.get(2).equals("v2");
+        final int domainIndex = isV2Format ? 4 : 3;
+        checkArgument(parts.get(domainIndex).equals("persistent") ||
+                parts.get(domainIndex).equals("non-persistent"));
         checkArgument(parts.get(8).length() > 0, "Empty subscription name");
 
         return parts.get(8);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 151bdbf..25fed37 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -44,7 +44,6 @@ import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
 import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -93,7 +92,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
 
         try {
             ProducerConfiguration conf = getProducerConfiguration();
-            this.producer = service.getPulsarClient().createProducer(topic, conf);
+            this.producer = service.getPulsarClient().createProducer(topic.toString(), conf);
             if (!this.service.addProducer(this)) {
                 log.warn("[{}:{}] Failed to add producer handler for topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -226,7 +225,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
 
     @Override
     protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
-        return service.getAuthorizationService().canProduce(TopicName.get(topic), authRole, authenticationData);
+        return service.getAuthorizationService().canProduce(topic, authRole, authenticationData);
     }
 
     private void sendAckResponse(ProducerAck response) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 1eddd21..2efb585 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -32,11 +32,10 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ReaderImpl;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerMessage;
@@ -55,8 +54,10 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class ReaderHandler extends AbstractWebSocketHandler {
-    private String subscription;
-    private final ReaderConfiguration conf;
+
+    private static final int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
+
+    private String subscription = "";
     private Reader<byte[]> reader;
 
     private final int maxPendingMessages;
@@ -70,9 +71,10 @@ public class ReaderHandler extends AbstractWebSocketHandler {
 
     public ReaderHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
         super(service, request, response);
-        this.subscription = "";
-        this.conf = getReaderConfiguration();
-        this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize();
+
+        final int receiverQueueSize = getReceiverQueueSize();
+
+        this.maxPendingMessages = (receiverQueueSize == 0) ? 1 : receiverQueueSize;
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
 
@@ -81,7 +83,16 @@ public class ReaderHandler extends AbstractWebSocketHandler {
         }
 
         try {
-            this.reader = service.getPulsarClient().createReader(topic, getMessageId(), conf);
+            ReaderBuilder<byte[]> builder = service.getPulsarClient().newReader()
+                    .topic(topic.toString())
+                    .startMessageId(getMessageId())
+                    .receiverQueueSize(receiverQueueSize);
+            if (queryParams.containsKey("readerName")) {
+                builder.readerName(queryParams.get("readerName"));
+            }
+
+            this.reader = builder.create();
+
             this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription();
             if (!this.service.addReader(this)) {
                 log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
@@ -199,7 +210,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
     }
 
     public Consumer getConsumer() {
-        return ((ReaderImpl)reader).getConsumer();
+        return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
     }
 
     public String getSubscription() {
@@ -228,25 +239,20 @@ public class ReaderHandler extends AbstractWebSocketHandler {
         numBytesDelivered.add(msgSize);
     }
 
-    private ReaderConfiguration getReaderConfiguration() {
-        ReaderConfiguration conf = new ReaderConfiguration();
-
-        if (queryParams.containsKey("readerName")) {
-            conf.setReaderName(queryParams.get("readerName"));
-        }
-
-        if (queryParams.containsKey("receiverQueueSize")) {
-            conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), 1000));
-        }
-        return conf;
-    }
-
     @Override
     protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
-        return service.getAuthorizationService().canConsume(TopicName.get(topic), authRole, authenticationData,
+        return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
                 this.subscription);
     }
 
+    private int getReceiverQueueSize() {
+        int size = DEFAULT_RECEIVER_QUEUE_SIZE;
+        if (queryParams.containsKey("receiverQueueSize")) {
+            size =  Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), DEFAULT_RECEIVER_QUEUE_SIZE);
+        }
+        return size;
+    }
+
     private MessageId getMessageId() throws IOException {
         MessageId messageId = MessageId.latest;
         if (isNotBlank(queryParams.get("messageId"))) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
index 73c70dd..6821236 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
@@ -25,6 +25,7 @@ public class WebSocketConsumerServlet extends WebSocketServlet {
     private static final long serialVersionUID = 1L;
 
     public static final String SERVLET_PATH = "/ws/consumer";
+    public static final String SERVLET_PATH_V2 = "/ws/v2/consumer";
 
     WebSocketService service;
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
index 2dfc377..c533166 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
@@ -25,6 +25,7 @@ public class WebSocketProducerServlet extends WebSocketServlet {
     private static final long serialVersionUID = 1L;
 
     public static final String SERVLET_PATH = "/ws/producer";
+    public static final String SERVLET_PATH_V2 = "/ws/v2/producer";
 
     private final WebSocketService service;
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
index aab0c62..6d80a03 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
@@ -25,6 +25,7 @@ public class WebSocketReaderServlet extends WebSocketServlet {
     private static final long serialVersionUID = 1L;
 
     public static final String SERVLET_PATH = "/ws/reader";
+    public static final String SERVLET_PATH_V2 = "/ws/v2/reader";
 
     WebSocketService service;
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.