You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/24 17:57:17 UTC

[pulsar] branch master updated: [websocket] WebSocket proxy should return status code depending on type of PulsarClientException (#9031)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c45b82  [websocket] WebSocket proxy should return status code depending on type of PulsarClientException (#9031)
8c45b82 is described below

commit 8c45b821448a67a1f2528c08347a57ff4c71d8bd
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Fri Dec 25 02:57:01 2020 +0900

    [websocket] WebSocket proxy should return status code depending on type of PulsarClientException (#9031)
    
    ### Motivation
    
    If the WebSocket proxy fails to create a producer or consumer, it should return the suitable HTTP status code to the client depending on the type of `PulsarClientException` that occurred. However, there are currently only a few exceptions where the WebSocket proxy returns a status code other than 500.
    https://github.com/apache/pulsar/blob/7be1b8dfcfaa4298c541c4eb134f1e752fa6a7d8/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java#L119-L129
    
    ### Modifications
    
    Moved the `getErrorCode()` method from `ProducerHandler` and `ConsumerHandler` to `AbstractWebSocketHandler` and increased the types of `PulsarClientException` to handle.
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 120 ++++++++++++++++++++-
 .../pulsar/websocket/AbstractWebSocketHandler.java |  48 ++++++++-
 .../apache/pulsar/websocket/ConsumerHandler.java   |  20 ----
 .../apache/pulsar/websocket/ProducerHandler.java   |  24 -----
 4 files changed, 165 insertions(+), 47 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 9e934b2..e4fd5a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -48,8 +48,12 @@ import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.apache.pulsar.websocket.service.ProxyServer;
@@ -384,6 +388,120 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void topicDoesNotExistTest() throws Exception {
+        final String namespace = "my-property/ns-topic-creation-not-allowed";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setAutoTopicCreation(namespace,
+                new AutoTopicCreationOverride(false, TopicType.NON_PARTITIONED.toString(), null));
+
+        final String topic = namespace + "/my-topic";
+        final String subscription = "my-sub";
+        final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/" + topic;
+        final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+
+        URI produceUri = URI.create(producerUri);
+        URI consumeUri = URI.create(consumerUri);
+
+        WebSocketClient produceClient = new WebSocketClient();
+        WebSocketClient consumeClient = new WebSocketClient();
+
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            producerFuture.get();
+            fail("should fail: topic does not exist");
+        } catch (Exception e) {
+            // Expected
+            assertTrue(e.getCause() instanceof UpgradeException);
+            assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
+        } finally {
+            stopWebSocketClient(produceClient);
+        }
+
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+            fail("should fail: topic does not exist");
+        } catch (Exception e) {
+            // Expected
+            assertTrue(e.getCause() instanceof UpgradeException);
+            assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_NOT_FOUND);
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+
+        admin.namespaces().deleteNamespace(namespace);
+    }
+
+    @Test(timeOut = 10000)
+    public void producerFencedTest() throws Exception {
+        final String topic = "my-property/my-ns/producer-fenced-test";
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + topic)
+                .accessMode(ProducerAccessMode.Exclusive).create();
+
+        final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/" + topic;
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            producerFuture.get();
+            fail("should fail: producer fenced");
+        } catch (Exception e) {
+            // Expected
+            assertTrue(e.getCause() instanceof UpgradeException);
+            assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), HttpServletResponse.SC_CONFLICT);
+        } finally {
+            stopWebSocketClient(produceClient);
+            producer.close();
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void topicTerminatedTest() throws Exception {
+        final String topic = "my-property/my-ns/topic-terminated-test";
+        admin.topics().createNonPartitionedTopic("persistent://" + topic);
+        admin.topics().terminateTopic("persistent://" + topic);
+
+        final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/" + topic;
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            producerFuture.get();
+            fail("should fail: topic terminated");
+        } catch (Exception e) {
+            // Expected
+            assertTrue(e.getCause() instanceof UpgradeException);
+            assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+                    HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+        } finally {
+            stopWebSocketClient(produceClient);
+            admin.topics().delete("persistent://" + topic);
+        }
+    }
+
     /**
      * It verifies proxy topic-stats and proxy-metrics api
      *
@@ -648,4 +766,4 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
     }
 
     private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 6002f63..e28d753 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -22,6 +22,19 @@ import com.google.common.base.Splitter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException;
+import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
+import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
+import org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException;
+import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
+import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
@@ -31,7 +44,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.naming.AuthenticationException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.Closeable;
@@ -70,7 +82,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
                 log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), authRole, topic);
 
-            } catch (AuthenticationException e) {
+            } catch (javax.naming.AuthenticationException e) {
                 log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(),
                         request.getRemotePort(), authRole, topic, e.getMessage());
                 try {
@@ -107,6 +119,38 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
         return true;
     }
 
+    protected static int getErrorCode(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return HttpServletResponse.SC_BAD_REQUEST;
+        } else if (e instanceof AuthenticationException) {
+            return HttpServletResponse.SC_UNAUTHORIZED;
+        } else if (e instanceof AuthorizationException) {
+            return HttpServletResponse.SC_FORBIDDEN;
+        } else if (e instanceof NotFoundException || e instanceof TopicDoesNotExistException) {
+            return HttpServletResponse.SC_NOT_FOUND;
+        } else if (e instanceof ProducerBusyException || e instanceof ConsumerBusyException
+                || e instanceof ProducerFencedException || e instanceof IncompatibleSchemaException) {
+            return HttpServletResponse.SC_CONFLICT;
+        } else if (e instanceof TooManyRequestsException) {
+            return 429; // Too Many Requests
+        } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException
+                || e instanceof TopicTerminatedException) {
+            return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
+        } else if (e instanceof TimeoutException) {
+            return HttpServletResponse.SC_GATEWAY_TIMEOUT;
+        } else {
+            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    protected static String getErrorMessage(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return "Invalid query params: " + e.getMessage();
+        } else {
+            return "Failed to create producer/consumer: " + e.getMessage();
+        }
+    }
+
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 5faf6d2..6c21a1b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
@@ -43,7 +42,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
-import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
@@ -131,24 +129,6 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
         }
     }
 
-    private static int getErrorCode(Exception e) {
-        if (e instanceof IllegalArgumentException) {
-            return HttpServletResponse.SC_BAD_REQUEST;
-        } else if (e instanceof ConsumerBusyException) {
-            return HttpServletResponse.SC_CONFLICT;
-        } else {
-            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-        }
-    }
-
-    private static String getErrorMessage(Exception e) {
-        if (e instanceof IllegalArgumentException) {
-            return "Invalid query params: " + e.getMessage();
-        } else {
-            return "Failed to subscribe: " + e.getMessage();
-        }
-    }
-
     private void receiveMessage() {
         if (log.isDebugEnabled()) {
             log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 2d9c281..9552d42 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.CompressionType;
@@ -47,9 +46,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
-import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -116,26 +112,6 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         }
     }
 
-    private static int getErrorCode(Exception e) {
-        if (e instanceof IllegalArgumentException) {
-            return HttpServletResponse.SC_BAD_REQUEST;
-        } else if (e instanceof ProducerBusyException) {
-            return HttpServletResponse.SC_CONFLICT;
-        } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) {
-            return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
-        } else {
-            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-        }
-    }
-
-    private static String getErrorMessage(Exception e) {
-        if (e instanceof IllegalArgumentException) {
-            return "Invalid query params: " + e.getMessage();
-        } else {
-            return "Failed to create producer: " + e.getMessage();
-        }
-    }
-
     @Override
     public void close() throws IOException {
         if (producer != null) {