You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/24 00:50:32 UTC
[incubator-pulsar] branch master updated: Add v2 support to the
websockets endpoints. (#1429)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d11ae4d Add v2 support to the websockets endpoints. (#1429)
d11ae4d is described below
commit d11ae4d64451eeeef1125d7d6d32d8c2c0ab1445
Author: cckellogg <cc...@gmail.com>
AuthorDate: Fri Mar 23 17:50:30 2018 -0700
Add v2 support to the websockets endpoints. (#1429)
* Add v2 support to the websockets endpoints.
* Add servlet mounts for v2 websockets.
* Add default receiver queue size.
---
.../org/apache/pulsar/broker/PulsarService.java | 19 ++++++--
.../pulsar/websocket/AbstractWebSocketHandler.java | 31 ++++++++++---
.../apache/pulsar/websocket/ConsumerHandler.java | 18 +++++---
.../apache/pulsar/websocket/ProducerHandler.java | 5 +--
.../org/apache/pulsar/websocket/ReaderHandler.java | 52 ++++++++++++----------
.../pulsar/websocket/WebSocketConsumerServlet.java | 1 +
.../pulsar/websocket/WebSocketProducerServlet.java | 1 +
.../pulsar/websocket/WebSocketReaderServlet.java | 1 +
8 files changed, 87 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 51014a7..98a4e08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.zookeeper.ZooKeeper;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -310,12 +311,24 @@ public class PulsarService implements AutoCloseable {
new ClusterData(webServiceAddress, webServiceAddressTls, brokerServiceUrl, brokerServiceUrlTls),
config);
this.webSocketService.start();
+
+ final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
- new ServletHolder(new WebSocketProducerServlet(webSocketService)), true, attributeMap);
+ new ServletHolder(producerWebSocketServlet), true, attributeMap);
+ this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
+ new ServletHolder(producerWebSocketServlet), true, attributeMap);
+
+ final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
- new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true, attributeMap);
+ new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+ this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
+ new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+
+ final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH,
- new ServletHolder(new WebSocketReaderServlet(webSocketService)), true, attributeMap);
+ new ServletHolder(readerWebSocketServlet), true, attributeMap);
+ this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
+ new ServletHolder(readerWebSocketServlet), true, attributeMap);
}
if (LOG.isDebugEnabled()) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 97f73d2..7e06775 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -32,6 +32,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -46,9 +47,10 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected final WebSocketService service;
protected final HttpServletRequest request;
- protected final String topic;
+ protected final TopicName topic;
protected final Map<String, String> queryParams;
+
public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
this.service = service;
this.request = request;
@@ -149,22 +151,39 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return null;
}
- private String extractTopicName(HttpServletRequest request) {
+ private TopicName extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri);
- // Format must be like :
+ // V1 Format must be like :
// /ws/producer/persistent/my-property/my-cluster/my-ns/my-topic
// or
// /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
// or
// /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic
+
+ // V2 Format must be like :
+ // /ws/v2/producer/persistent/my-property/my-ns/my-topic
+ // or
+ // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
+ // or
+ // /ws/v2/reader/persistent/my-property/my-ns/my-topic
+
checkArgument(parts.size() >= 8, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
- checkArgument(parts.get(3).equals("persistent") || parts.get(3).equals("non-persistent"));
- TopicName topicName = TopicName.get(parts.get(3), parts.get(4), parts.get(5), parts.get(6), parts.get(7));
- return topicName.toString();
+ final boolean isV2Format = parts.get(2).equals("v2");
+ final int domainIndex = isV2Format ? 4 : 3;
+ checkArgument(parts.get(domainIndex).equals("persistent") ||
+ parts.get(domainIndex).equals("non-persistent"));
+
+
+ final String domain = parts.get(domainIndex);
+ final NamespaceName namespace = isV2Format ? NamespaceName.get(parts.get(5), parts.get(6)) :
+ NamespaceName.get( parts.get(4), parts.get(5), parts.get(6));
+ final String name = parts.get(7);
+
+ return TopicName.get(domain, namespace, name);
}
protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 8b0f573..aea88f3 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,10 +35,9 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -91,7 +90,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
return;
}
- this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
+ this.consumer = service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -298,7 +297,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
- return service.getAuthorizationService().canConsume(TopicName.get(topic), authRole, authenticationData,
+ return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
this.subscription);
}
@@ -306,11 +305,18 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri);
- // Format must be like :
+ // v1 Format must be like :
// /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
+
+ // v2 Format must be like :
+ // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription
checkArgument(parts.size() == 9, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
- checkArgument(parts.get(3).equals("persistent")|| parts.get(3).equals("non-persistent"));
+
+ final boolean isV2Format = parts.get(2).equals("v2");
+ final int domainIndex = isV2Format ? 4 : 3;
+ checkArgument(parts.get(domainIndex).equals("persistent") ||
+ parts.get(domainIndex).equals("non-persistent"));
checkArgument(parts.get(8).length() > 0, "Empty subscription name");
return parts.get(8);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 151bdbf..25fed37 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -44,7 +44,6 @@ import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -93,7 +92,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
try {
ProducerConfiguration conf = getProducerConfiguration();
- this.producer = service.getPulsarClient().createProducer(topic, conf);
+ this.producer = service.getPulsarClient().createProducer(topic.toString(), conf);
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -226,7 +225,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
- return service.getAuthorizationService().canProduce(TopicName.get(topic), authRole, authenticationData);
+ return service.getAuthorizationService().canProduce(topic, authRole, authenticationData);
}
private void sendAckResponse(ProducerAck response) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 1eddd21..2efb585 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -32,11 +32,10 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerMessage;
@@ -55,8 +54,10 @@ import org.slf4j.LoggerFactory;
*
*/
public class ReaderHandler extends AbstractWebSocketHandler {
- private String subscription;
- private final ReaderConfiguration conf;
+
+ private static final int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
+
+ private String subscription = "";
private Reader<byte[]> reader;
private final int maxPendingMessages;
@@ -70,9 +71,10 @@ public class ReaderHandler extends AbstractWebSocketHandler {
public ReaderHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
- this.subscription = "";
- this.conf = getReaderConfiguration();
- this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize();
+
+ final int receiverQueueSize = getReceiverQueueSize();
+
+ this.maxPendingMessages = (receiverQueueSize == 0) ? 1 : receiverQueueSize;
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
@@ -81,7 +83,16 @@ public class ReaderHandler extends AbstractWebSocketHandler {
}
try {
- this.reader = service.getPulsarClient().createReader(topic, getMessageId(), conf);
+ ReaderBuilder<byte[]> builder = service.getPulsarClient().newReader()
+ .topic(topic.toString())
+ .startMessageId(getMessageId())
+ .receiverQueueSize(receiverQueueSize);
+ if (queryParams.containsKey("readerName")) {
+ builder.readerName(queryParams.get("readerName"));
+ }
+
+ this.reader = builder.create();
+
this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription();
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
@@ -199,7 +210,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
}
public Consumer getConsumer() {
- return ((ReaderImpl)reader).getConsumer();
+ return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
}
public String getSubscription() {
@@ -228,25 +239,20 @@ public class ReaderHandler extends AbstractWebSocketHandler {
numBytesDelivered.add(msgSize);
}
- private ReaderConfiguration getReaderConfiguration() {
- ReaderConfiguration conf = new ReaderConfiguration();
-
- if (queryParams.containsKey("readerName")) {
- conf.setReaderName(queryParams.get("readerName"));
- }
-
- if (queryParams.containsKey("receiverQueueSize")) {
- conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), 1000));
- }
- return conf;
- }
-
@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
- return service.getAuthorizationService().canConsume(TopicName.get(topic), authRole, authenticationData,
+ return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
this.subscription);
}
+ private int getReceiverQueueSize() {
+ int size = DEFAULT_RECEIVER_QUEUE_SIZE;
+ if (queryParams.containsKey("receiverQueueSize")) {
+ size = Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), DEFAULT_RECEIVER_QUEUE_SIZE);
+ }
+ return size;
+ }
+
private MessageId getMessageId() throws IOException {
MessageId messageId = MessageId.latest;
if (isNotBlank(queryParams.get("messageId"))) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
index 73c70dd..6821236 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketConsumerServlet.java
@@ -25,6 +25,7 @@ public class WebSocketConsumerServlet extends WebSocketServlet {
private static final long serialVersionUID = 1L;
public static final String SERVLET_PATH = "/ws/consumer";
+ public static final String SERVLET_PATH_V2 = "/ws/v2/consumer";
WebSocketService service;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
index 2dfc377..c533166 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketProducerServlet.java
@@ -25,6 +25,7 @@ public class WebSocketProducerServlet extends WebSocketServlet {
private static final long serialVersionUID = 1L;
public static final String SERVLET_PATH = "/ws/producer";
+ public static final String SERVLET_PATH_V2 = "/ws/v2/producer";
private final WebSocketService service;
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
index aab0c62..6d80a03 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java
@@ -25,6 +25,7 @@ public class WebSocketReaderServlet extends WebSocketServlet {
private static final long serialVersionUID = 1L;
public static final String SERVLET_PATH = "/ws/reader";
+ public static final String SERVLET_PATH_V2 = "/ws/v2/reader";
WebSocketService service;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.