You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/23 22:54:10 UTC
[incubator-pulsar] branch master updated: Change HTTP status code which WebSocket proxy returns to producer whe… (#1242)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new adbf818 Change HTTP status code which WebSocket proxy returns to producer whe… (#1242)
adbf818 is described below
commit adbf818626988f4cd3cf008462d173b440d4c1ba
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Sat Feb 24 07:54:07 2018 +0900
Change HTTP status code which WebSocket proxy returns to producer whe… (#1242)
* Change HTTP status code which WebSocket proxy returns to producer when backlog exceeds threshold
* Fix WebSocket proxy to return exception message to client
* Add some tests for WebSocket
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 206 +++++++++++++++++----
.../pulsar/websocket/AbstractWebSocketHandler.java | 5 +-
.../apache/pulsar/websocket/ConsumerHandler.java | 39 ++--
.../apache/pulsar/websocket/ProducerHandler.java | 6 +-
.../org/apache/pulsar/websocket/ReaderHandler.java | 5 +-
5 files changed, 207 insertions(+), 54 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index c25c947..3c40173 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
@@ -39,6 +40,7 @@ import javax.ws.rs.core.Response;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
@@ -70,8 +72,12 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
private ProxyServer proxyServer;
private WebSocketService service;
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
@BeforeMethod
public void setup() throws Exception {
+ conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
super.internalSetup();
super.producerBaseSetup();
@@ -89,6 +95,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
@AfterMethod
protected void cleanup() throws Exception {
+ super.resetConfig();
super.internalCleanup();
service.close();
proxyServer.stop();
@@ -97,7 +104,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
@Test(timeOut = 10000)
public void socketTest() throws Exception {
- String consumerUri = "ws://localhost:" + port
+ final String consumerUri = "ws://localhost:" + port
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1";
String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/";
@@ -167,32 +174,16 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
} finally {
- ExecutorService executor = newFixedThreadPool(1);
- try {
- executor.submit(() -> {
- try {
- consumeClient1.stop();
- consumeClient2.stop();
- readClient.stop();
- produceClient.stop();
- log.info("proxy clients are stopped successfully");
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }).get(2, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.error("failed to close clients ", e);
- }
- executor.shutdownNow();
+ stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
}
}
@Test(timeOut = 10000)
- public void badConsumerTest() throws Exception {
+ public void emptySubcriptionConsumerTest() throws Exception {
// Empty subcription name
- String consumerUri = "ws://localhost:" + port
- + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive";
+ final String consumerUri = "ws://localhost:" + port
+ + "/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive";
URI consumeUri = URI.create(consumerUri);
WebSocketClient consumeClient1 = new WebSocketClient();
@@ -207,21 +198,148 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
} catch (Exception e) {
// Expected
Assert.assertTrue(e.getCause() instanceof UpgradeException);
+ Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+ HttpServletResponse.SC_BAD_REQUEST);
} finally {
- ExecutorService executor = newFixedThreadPool(1);
+ stopWebSocketClient(consumeClient1);
+ }
+ }
+
+ @Test(timeOut = 10000)
+ public void conflictingConsumerTest() throws Exception {
+ final String consumerUri = "ws://localhost:" + port
+ + "/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
+ URI consumeUri = URI.create(consumerUri);
+
+ WebSocketClient consumeClient1 = new WebSocketClient();
+ WebSocketClient consumeClient2 = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+ SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+
+ try {
+ consumeClient1.start();
+ ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+ Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+ consumerFuture1.get();
+
try {
- executor.submit(() -> {
- try {
- consumeClient1.stop();
- log.info("proxy clients are stopped successfully");
- } catch (Exception e) {
- log.error(e.getMessage());
- }
- }).get(2, TimeUnit.SECONDS);
+ consumeClient2.start();
+ ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+ Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+ consumerFuture2.get();
+ Assert.fail("should fail: conflicting subscription name");
} catch (Exception e) {
- log.error("failed to close clients ", e);
+ // Expected
+ Assert.assertTrue(e.getCause() instanceof UpgradeException);
+ Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+ HttpServletResponse.SC_CONFLICT);
+ } finally {
+ stopWebSocketClient(consumeClient2);
}
- executor.shutdownNow();
+ } finally {
+ stopWebSocketClient(consumeClient1);
+ }
+ }
+
+ @Test(timeOut = 10000)
+ public void conflictingProducerTest() throws Exception {
+ final String producerUri = "ws://localhost:" + port
+ + "/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer";
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient produceClient1 = new WebSocketClient();
+ WebSocketClient produceClient2 = new WebSocketClient();
+ SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+ SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+ try {
+ produceClient1.start();
+ ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest();
+ Future<Session> producerFuture1 = produceClient1.connect(produceSocket1, produceUri, produceRequest1);
+ producerFuture1.get();
+
+ try {
+ produceClient2.start();
+ ClientUpgradeRequest produceRequest2 = new ClientUpgradeRequest();
+ Future<Session> producerFuture2 = produceClient2.connect(produceSocket2, produceUri, produceRequest2);
+ producerFuture2.get();
+ Assert.fail("should fail: conflicting producer name");
+ } catch (Exception e) {
+ // Expected
+ Assert.assertTrue(e.getCause() instanceof UpgradeException);
+ Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+ HttpServletResponse.SC_CONFLICT);
+ } finally {
+ stopWebSocketClient(produceClient2);
+ }
+ } finally {
+ stopWebSocketClient(produceClient1);
+ }
+ }
+
+ @Test(timeOut = 30000)
+ public void producerBacklogQuotaExceededTest() throws Exception {
+ admin.namespaces().createNamespace("my-property/use/ns-ws-quota");
+ admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota",
+ new BacklogQuota(10, BacklogQuota.RetentionPolicy.producer_request_hold));
+
+ final String topic = "my-property/use/ns-ws-quota/my-topic5";
+ final String subscription = "my-sub";
+ final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/" + subscription;
+ final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic;
+
+ URI consumeUri = URI.create(consumerUri);
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient consumeClient = new WebSocketClient();
+ WebSocketClient produceClient1 = new WebSocketClient();
+ WebSocketClient produceClient2 = new WebSocketClient();
+
+ SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+ SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+ SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+ // Create subscription
+ try {
+ consumeClient.start();
+ ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+ Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+ consumerFuture.get();
+ } finally {
+ stopWebSocketClient(consumeClient);
+ }
+
+ // Fill the backlog
+ try {
+ produceClient1.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient1.connect(produceSocket1, produceUri, produceRequest);
+ producerFuture.get();
+ produceSocket1.sendMessage(100);
+ } finally {
+ stopWebSocketClient(produceClient1);
+ }
+
+ Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
+ // New producer fails to connect
+ try {
+ produceClient2.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture = produceClient2.connect(produceSocket2, produceUri, produceRequest);
+ producerFuture.get();
+ Assert.fail("should fail: backlog quota exceeded");
+ } catch (Exception e) {
+ // Expected
+ Assert.assertTrue(e.getCause() instanceof UpgradeException);
+ Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(),
+ HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+ } finally {
+ stopWebSocketClient(produceClient2);
+ admin.persistentTopics().skipAllMessages("persistent://" + topic, subscription);
+ admin.persistentTopics().delete("persistent://" + topic);
+ admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota");
+ admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota");
}
}
@@ -232,7 +350,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
*/
@Test(timeOut = 10000)
public void testProxyStats() throws Exception {
- final String topic = "my-property/use/my-ns/my-topic2";
+ final String topic = "my-property/use/my-ns/my-topic6";
final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic
+ "/my-sub?subscriptionType=Failover";
final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/";
@@ -299,9 +417,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
verifyTopicStat(client, baseUrl, topic);
} finally {
- consumeClient1.stop();
- produceClient.stop();
- log.info("proxy clients are stopped successfully");
+ stopWebSocketClient(consumeClient1, produceClient);
}
}
@@ -361,5 +477,23 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
Assert.assertNotNull(producerStats.remoteConnection);
}
+ private void stopWebSocketClient(WebSocketClient... clients) {
+ ExecutorService executor = newFixedThreadPool(1);
+ try {
+ executor.submit(() -> {
+ try {
+ for (WebSocketClient client : clients) {
+ client.stop();
+ }
+ log.info("proxy clients are stopped successfully");
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ }).get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("failed to close proxy clients", e);
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 91e62b7..ad6f910 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -48,7 +48,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected final String topic;
protected final Map<String, String> queryParams;
- protected final boolean authResult;
public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
this.service = service;
@@ -59,11 +58,9 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
request.getParameterMap().forEach((key, values) -> {
queryParams.put(key, values[0]);
});
-
- authResult = checkAuth(response);
}
- private boolean checkAuth(ServletUpgradeResponse response) {
+ protected boolean checkAuth(ServletUpgradeResponse response) {
String authRole = "<none>";
AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
if (service.isAuthenticationEnabled()) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index b392e0c..6edcf2a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.DateFormatter;
@@ -64,7 +65,7 @@ import com.google.common.base.Splitter;
*/
public class ConsumerHandler extends AbstractWebSocketHandler {
- private final String subscription;
+ private String subscription = null;
private final ConsumerConfiguration conf;
private Consumer consumer;
@@ -80,18 +81,19 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
- this.subscription = extractSubscription(request);
this.conf = getConsumerConfiguration();
this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize();
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();
- if (!authResult) {
- return;
- }
-
try {
+ // checkAuth() should be called after assigning a value to this.subscription
+ this.subscription = extractSubscription(request);
+ if (!checkAuth(response)) {
+ return;
+ }
+
this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
@@ -100,12 +102,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
- boolean configError = e instanceof IllegalArgumentException;
- int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
- : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
- String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to subscribe";
+
try {
- response.sendError(errorCode, errorMsg);
+ response.sendError(getErrorCode(e), getErrorMessage(e));
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
@@ -113,6 +112,24 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
}
+ private static int getErrorCode(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return HttpServletResponse.SC_BAD_REQUEST;
+ } else if (e instanceof ConsumerBusyException) {
+ return HttpServletResponse.SC_CONFLICT;
+ } else {
+ return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ private static String getErrorMessage(Exception e) {
+ if (e instanceof IllegalArgumentException) {
+ return "Invalid query params: " + e.getMessage();
+ } else {
+ return "Failed to subscribe: " + e.getMessage();
+ }
+ }
+
private void receiveMessage() {
if (log.isDebugEnabled()) {
log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index cb2e288..7df14a5 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -85,7 +87,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
this.numMsgsFailed = new LongAdder();
this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
- if (!authResult) {
+ if (!checkAuth(response)) {
return;
}
@@ -114,6 +116,8 @@ public class ProducerHandler extends AbstractWebSocketHandler {
return HttpServletResponse.SC_BAD_REQUEST;
} else if (e instanceof ProducerBusyException) {
return HttpServletResponse.SC_CONFLICT;
+ } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) {
+ return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
} else {
return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 4d6c271..d643df2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -82,7 +82,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
- if (!authResult) {
+ if (!checkAuth(response)) {
return;
}
@@ -97,7 +97,8 @@ public class ReaderHandler extends AbstractWebSocketHandler {
log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
try {
- response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create reader");
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+ "Failed to create reader: " + e.getMessage());
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.