You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/24 17:57:17 UTC
[pulsar] branch master updated: [websocket] WebSocket proxy should
return status code depending on type of PulsarClientException (#9031)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c45b82 [websocket] WebSocket proxy should return status code depending on type of PulsarClientException (#9031)
8c45b82 is described below
commit 8c45b821448a67a1f2528c08347a57ff4c71d8bd
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Fri Dec 25 02:57:01 2020 +0900
[websocket] WebSocket proxy should return status code depending on type of PulsarClientException (#9031)
### Motivation
If the WebSocket proxy fails to create a producer or consumer, it should return the suitable HTTP status code to the client depending on the type of `PulsarClientException` that occurred. However, there are currently only a few exceptions where the WebSocket proxy returns a status code other than 500.
https://github.com/apache/pulsar/blob/7be1b8dfcfaa4298c541c4eb134f1e752fa6a7d8/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java#L119-L129
### Modifications
Moved the `getErrorCode()` method from `ProducerHandler` and `ConsumerHandler` to `AbstractWebSocketHandler` and increased the types of `PulsarClientException` to handle.
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 120 ++++++++++++++++++++-
.../pulsar/websocket/AbstractWebSocketHandler.java | 48 ++++++++-
.../apache/pulsar/websocket/ConsumerHandler.java | 20 ----
.../apache/pulsar/websocket/ProducerHandler.java | 24 -----
4 files changed, 165 insertions(+), 47 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 9e934b2..e4fd5a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -48,8 +48,12 @@ import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
@@ -384,6 +388,120 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = 10000)
+ public void topicDoesNotExistTest() throws Exception {
+ final String namespace = "my-property/ns-topic-creation-not-allowed";
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setAutoTopicCreation(namespace,
+ new AutoTopicCreationOverride(false, TopicType.NON_PARTITIONED.toString(), null));
+
+ final String topic = namespace + "/my-topic";
+ final String subscription = "my-sub";
+ final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/producer/persistent/" + topic;
+ final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+
+ URI produceUri = URI.create(producerUri);
+ URI consumeUri = URI.create(consumerUri);
+
+ WebSocketClient produceClient = new WebSocketClient();
+ WebSocketClient consumeClient = new WebSocketClient();
+
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+ SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ producerFuture.get();
+ fail("should fail: topic does not exist");
+ } catch (Exception e) {
+ // Expected
+ assertTrue(e.getCause() instanceof UpgradeException);
+ assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
+ } finally {
+ stopWebSocketClient(produceClient);
+ }
+
+ try {
+ consumeClient.start();
+ ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+ Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+ consumerFuture.get();
+ fail("should fail: topic does not exist");
+ } catch (Exception e) {
+ // Expected
+ assertTrue(e.getCause() instanceof UpgradeException);
+ assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
+ } finally {
+ stopWebSocketClient(consumeClient);
+ }
+
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
+ @Test(timeOut = 10000)
+ public void producerFencedTest() throws Exception {
+ final String topic = "my-property/my-ns/producer-fenced-test";
+ Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + topic)
+ .accessMode(ProducerAccessMode.Exclusive).create();
+
+ final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/producer/persistent/" + topic;
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ producerFuture.get();
+ fail("should fail: producer fenced");
+ } catch (Exception e) {
+ // Expected
+ assertTrue(e.getCause() instanceof UpgradeException);
+ assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_CONFLICT);
+ } finally {
+ stopWebSocketClient(produceClient);
+ producer.close();
+ }
+ }
+
+ @Test(timeOut = 10000)
+ public void topicTerminatedTest() throws Exception {
+ final String topic = "my-property/my-ns/topic-terminated-test";
+ admin.topics().createNonPartitionedTopic("persistent://" + topic);
+ admin.topics().terminateTopic("persistent://" + topic);
+
+ final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/producer/persistent/" + topic;
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ producerFuture.get();
+ fail("should fail: topic terminated");
+ } catch (Exception e) {
+ // Expected
+ assertTrue(e.getCause() instanceof UpgradeException);
+ assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+ HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ } finally {
+ stopWebSocketClient(produceClient);
+ admin.topics().delete("persistent://" + topic);
+ }
+ }
+
/**
* It verifies proxy topic-stats and proxy-metrics api
*
@@ -648,4 +766,4 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 6002f63..e28d753 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -22,6 +22,19 @@ import com.google.common.base.Splitter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
+import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
+import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
+import org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException;
+import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
+import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
@@ -31,7 +44,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
@@ -70,7 +82,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic);
- } catch (AuthenticationException e) {
+ } catch (javax.naming.AuthenticationException e) {
log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic, e.getMessage());
try {
@@ -107,6 +119,38 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
return true;
}
+ protected static int getErrorCode(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return HttpServletResponse.SC_BAD_REQUEST;
+ } else if (e instanceof AuthenticationException) {
+ return HttpServletResponse.SC_UNAUTHORIZED;
+ } else if (e instanceof AuthorizationException) {
+ return HttpServletResponse.SC_FORBIDDEN;
+ } else if (e instanceof NotFoundException || e instanceof TopicDoesNotExistException) {
+ return HttpServletResponse.SC_NOT_FOUND;
+ } else if (e instanceof ProducerBusyException || e instanceof ConsumerBusyException
+ || e instanceof ProducerFencedException || e instanceof IncompatibleSchemaException) {
+ return HttpServletResponse.SC_CONFLICT;
+ } else if (e instanceof TooManyRequestsException) {
+ return 429; // Too Many Requests
+ } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException
+ || e instanceof TopicTerminatedException) {
+ return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
+ } else if (e instanceof TimeoutException) {
+ return HttpServletResponse.SC_GATEWAY_TIMEOUT;
+ } else {
+ return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ protected static String getErrorMessage(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return "Invalid query params: " + e.getMessage();
+ } else {
+ return "Failed to create producer/consumer: " + e.getMessage();
+ }
+ }
+
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 5faf6d2..6c21a1b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
@@ -43,7 +42,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
-import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
@@ -131,24 +129,6 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
}
- private static int getErrorCode(Exception e) {
- if (e instanceof IllegalArgumentException) {
- return HttpServletResponse.SC_BAD_REQUEST;
- } else if (e instanceof ConsumerBusyException) {
- return HttpServletResponse.SC_CONFLICT;
- } else {
- return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
- }
- }
-
- private static String getErrorMessage(Exception e) {
- if (e instanceof IllegalArgumentException) {
- return "Invalid query params: " + e.getMessage();
- } else {
- return "Failed to subscribe: " + e.getMessage();
- }
- }
-
private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 2d9c281..9552d42 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
@@ -47,9 +46,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.util.DateFormatter;
@@ -116,26 +112,6 @@ public class ProducerHandler extends AbstractWebSocketHandler {
}
}
- private static int getErrorCode(Exception e) {
- if (e instanceof IllegalArgumentException) {
- return HttpServletResponse.SC_BAD_REQUEST;
- } else if (e instanceof ProducerBusyException) {
- return HttpServletResponse.SC_CONFLICT;
- } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) {
- return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
- } else {
- return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
- }
- }
-
- private static String getErrorMessage(Exception e) {
- if (e instanceof IllegalArgumentException) {
- return "Invalid query params: " + e.getMessage();
- } else {
- return "Failed to create producer: " + e.getMessage();
- }
- }
-
@Override
public void close() throws IOException {
if (producer != null) {