You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:35:53 UTC
[pulsar] branch branch-2.6 updated: Support enable WebSocket on
Pulsar Proxy. (#8613)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 4316d96 Support enable WebSocket on Pulsar Proxy. (#8613)
4316d96 is described below
commit 4316d96bffae471f2c963dd5b05cc401f4cdea03
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Nov 21 09:09:44 2020 +0800
Support enable WebSocket on Pulsar Proxy. (#8613)
### Motivation
Support enable WebSocket on Pulsar Proxy.
### Verifying this change
Integration tests added.
(cherry picked from commit 19767c714206aaf99e607301ba66b667579b6f34)
---
.github/workflows/ci-integration-messaging.yaml | 20 +++-
conf/proxy.conf | 8 ++
pulsar-proxy/pom.xml | 6 +
.../pulsar/proxy/server/ProxyConfiguration.java | 16 +++
.../pulsar/proxy/server/ProxyServiceStarter.java | 31 ++++-
tests/integration/pom.xml | 5 +
.../pulsar/tests/integration/proxy/TestProxy.java | 13 +-
.../integration/proxy/TestProxyWithWebSocket.java | 133 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 3 +
.../integration/topologies/PulsarClusterSpec.java | 5 +
.../src/test/resources/pulsar-proxy-websocket.xml | 28 +++++
.../src/test/resources/pulsar-proxy.xml | 28 +++++
12 files changed, 282 insertions(+), 14 deletions(-)
diff --git a/.github/workflows/ci-integration-messaging.yaml b/.github/workflows/ci-integration-messaging.yaml
index 3bf49ea..27353ea 100644
--- a/.github/workflows/ci-integration-messaging.yaml
+++ b/.github/workflows/ci-integration-messaging.yaml
@@ -61,10 +61,26 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests
+ - name: build pulsar image
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+ - name: build pulsar-all image
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
- run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
+ run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
- - name: run integration tests
+ - name: run integration messaging tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-messaging.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+ - name: run integration proxy tests
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+ - name: run integration proxy with WebSocket tests
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests -DredirectTestOutputToFile=false
diff --git a/conf/proxy.conf b/conf/proxy.conf
index cbffe0d..a257184 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -209,6 +209,14 @@ tokenAudienceClaim=
# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
tokenAudience=
+### --- WebSocket config variables --- ###
+
+# Enable or disable the WebSocket servlet.
+webSocketServiceEnabled=false
+
+# Name of the cluster to which this broker belongs to
+clusterName=
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 6580691..3a48f5d 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -56,6 +56,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-websocket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e42ec93..810aa1b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -70,6 +70,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
@Category
private static final String CATEGORY_PLUGIN = "proxy plugin";
+ @Category
+ private static final String CATEGORY_WEBSOCKET = "WebSocket";
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
@@ -522,6 +524,20 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
}
)
+
+ /***** --- WebSocket --- ****/
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Enable or disable the WebSocket servlet"
+ )
+ private boolean webSocketServiceEnabled = false;
+
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Name of the cluster to which this broker belongs to"
+ )
+ private String clusterName;
+
private Properties properties = new Properties();
public Properties getProperties() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 3a30b47..c1178b0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -29,8 +29,13 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
+import org.apache.pulsar.websocket.WebSocketConsumerServlet;
+import org.apache.pulsar.websocket.WebSocketProducerServlet;
+import org.apache.pulsar.websocket.WebSocketReaderServlet;
+import org.apache.pulsar.websocket.WebSocketService;
import org.eclipse.jetty.proxy.ProxyServlet;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,7 +189,7 @@ public class ProxyServiceStarter {
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
- BrokerDiscoveryProvider discoveryProvider) {
+ BrokerDiscoveryProvider discoveryProvider) throws Exception {
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
@@ -215,6 +220,30 @@ public class ProxyServiceStarter {
log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
}
}
+
+ if (config.isWebSocketServiceEnabled()) {
+ // add WebSocket servlet
+ // Use local broker address to avoid different IP address when using a VIP for service discovery
+ WebSocketService webSocketService = new WebSocketService(null, PulsarConfigurationLoader.convertFrom(config));
+ webSocketService.start();
+ final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
+ server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
+ new ServletHolder(producerWebSocketServlet));
+ server.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
+ new ServletHolder(producerWebSocketServlet));
+
+ final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
+ server.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
+ new ServletHolder(consumerWebSocketServlet));
+ server.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
+ new ServletHolder(consumerWebSocketServlet));
+
+ final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
+ server.addServlet(WebSocketReaderServlet.SERVLET_PATH,
+ new ServletHolder(readerWebSocketServlet));
+ server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
+ new ServletHolder(readerWebSocketServlet));
+ }
}
private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 20dce5b..6daedaf 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -170,6 +170,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
index 10af298..e0f95a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;
@@ -44,19 +43,11 @@ import org.slf4j.LoggerFactory;
*/
public class TestProxy extends PulsarTestSuite {
private final static Logger log = LoggerFactory.getLogger(TestProxy.class);
- private ProxyContainer proxyViaURL;
@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
- proxyViaURL = new ProxyContainer(clusterName, "proxy-via-url")
- .withEnv("brokerServiceURL", "pulsar://pulsar-broker-0:6650")
- .withEnv("brokerWebServiceURL", "http://pulsar-broker-0:8080")
- .withEnv("clusterName", clusterName);
-
- specBuilder.externalService("proxy-via-url", proxyViaURL);
-
return super.beforeSetupCluster(clusterName, specBuilder);
}
@@ -107,7 +98,7 @@ public class TestProxy extends PulsarTestSuite {
@Test
public void testProxyWithNoServiceDiscoveryProxyConnectsViaURL() throws Exception {
- testProxy(proxyViaURL.getPlainTextServiceUrl(), proxyViaURL.getHttpServiceUrl());
+ testProxy(pulsarCluster.getProxy().getPlainTextServiceUrl(), pulsarCluster.getProxy().getHttpServiceUrl());
}
@Test
@@ -119,7 +110,7 @@ public class TestProxy extends PulsarTestSuite {
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
- .serviceHttpUrl(pulsarCluster.getPlainTextServiceUrl())
+ .serviceHttpUrl(pulsarCluster.getProxy().getHttpServiceUrl())
.build();
admin.tenants().createTenant(tenant,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java
new file mode 100644
index 0000000..5847901
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.proxy;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
+
+/**
+ * Test cases for proxy.
+ */
+public class TestProxyWithWebSocket extends PulsarTestSuite {
+ private final static Logger log = LoggerFactory.getLogger(TestProxyWithWebSocket.class);
+
+ @Override
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ Map<String, String> envs = new HashMap<>();
+ envs.put("webSocketServiceEnabled", "true");
+ specBuilder.proxyEnvs(envs);
+ return super.beforeSetupCluster(clusterName, specBuilder);
+ }
+
+ @Test
+ public void testWebSocket() throws Exception {
+
+ final String tenant = "proxy-test-" + randomName(10);
+ final String namespace = tenant + "/ns1";
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .build();
+
+ admin.tenants().createTenant(tenant,
+ new TenantInfo(Collections.emptySet(), Collections.singleton(pulsarCluster.getClusterName())));
+
+ admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
+
+ HttpClient httpClient = new HttpClient();
+ WebSocketClient webSocketClient = new WebSocketClient(httpClient);
+ webSocketClient.start();
+ MyWebSocket myWebSocket = new MyWebSocket();
+ String webSocketUri = pulsarCluster.getProxy().getHttpServiceUrl().replaceFirst("http", "ws")
+ + "/ws/v2/producer/persistent/" + namespace + "/my-topic";
+ Future<Session> sessionFuture = webSocketClient.connect(myWebSocket,
+ URI.create(webSocketUri));
+ sessionFuture.get().getRemote().sendString("{\n" +
+ " \"payload\": \"SGVsbG8gV29ybGQ=\",\n" +
+ " \"properties\": {\"key1\": \"value1\", \"key2\": \"value2\"},\n" +
+ " \"context\": \"1\"\n" +
+ "}");
+
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+ String response = myWebSocket.getResponse();
+ Assert.assertNotNull(response);
+ Assert.assertTrue(response.contains("ok"));
+ });
+ }
+
+ @WebSocket
+ public static class MyWebSocket implements WebSocketListener {
+ Queue<String> incomingMessages = new ArrayBlockingQueue<>(10);
+ @Override
+ public void onWebSocketBinary(byte[] bytes, int i, int i1) {
+ }
+
+ @Override
+ public void onWebSocketText(String s) {
+ incomingMessages.add(s);
+ }
+
+ @Override
+ public void onWebSocketClose(int i, String s) {
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+
+ }
+
+ @Override
+ public void onWebSocketError(Throwable throwable) {
+
+ }
+
+ public String getResponse() {
+ return incomingMessages.poll();
+ }
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index d655545..8f40793 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -132,6 +132,9 @@ public class PulsarCluster {
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
+ if (spec.proxyEnvs != null) {
+ spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
+ }
// create bookies
bookieContainers.putAll(
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index e1e26ef..63530dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -126,4 +126,9 @@ public class PulsarClusterSpec {
*/
@Default
String pulsarTestImage = PulsarContainer.DEFAULT_IMAGE_NAME;
+
+ /**
+ * Specify envs for proxy.
+ */
+ Map<String, String> proxyEnvs;
}
diff --git a/tests/integration/src/test/resources/pulsar-proxy-websocket.xml b/tests/integration/src/test/resources/pulsar-proxy-websocket.xml
new file mode 100644
index 0000000..6e32ac9
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-proxy-websocket.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Proxy with WebSocket) Integration Tests" verbose="2" annotations="JDK">
+ <test name="messaging-test-suite" preserve-order="true">
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.proxy.TestProxyWithWebSocket" />
+ </classes>
+ </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-proxy.xml b/tests/integration/src/test/resources/pulsar-proxy.xml
new file mode 100644
index 0000000..ae6f138
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-proxy.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Proxy) Integration Tests" verbose="2" annotations="JDK">
+ <test name="messaging-test-suite" preserve-order="true">
+ <classes>
+ <class name="org.apache.pulsar.tests.integration.proxy.TestProxy" />
+ </classes>
+ </test>
+</suite>
\ No newline at end of file