You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:35:53 UTC

[pulsar] branch branch-2.6 updated: Support enable WebSocket on Pulsar Proxy. (#8613)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 4316d96  Support enable WebSocket on Pulsar Proxy. (#8613)
4316d96 is described below

commit 4316d96bffae471f2c963dd5b05cc401f4cdea03
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Nov 21 09:09:44 2020 +0800

    Support enable WebSocket on Pulsar Proxy. (#8613)
    
    ### Motivation
    
    Support enable WebSocket on Pulsar Proxy.
    
    ### Verifying this change
    
    Integration tests added.
    
    (cherry picked from commit 19767c714206aaf99e607301ba66b667579b6f34)
---
 .github/workflows/ci-integration-messaging.yaml    |  20 +++-
 conf/proxy.conf                                    |   8 ++
 pulsar-proxy/pom.xml                               |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java    |  16 +++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 ++++-
 tests/integration/pom.xml                          |   5 +
 .../pulsar/tests/integration/proxy/TestProxy.java  |  13 +-
 .../integration/proxy/TestProxyWithWebSocket.java  | 133 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |   3 +
 .../integration/topologies/PulsarClusterSpec.java  |   5 +
 .../src/test/resources/pulsar-proxy-websocket.xml  |  28 +++++
 .../src/test/resources/pulsar-proxy.xml            |  28 +++++
 12 files changed, 282 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/ci-integration-messaging.yaml b/.github/workflows/ci-integration-messaging.yaml
index 3bf49ea..27353ea 100644
--- a/.github/workflows/ci-integration-messaging.yaml
+++ b/.github/workflows/ci-integration-messaging.yaml
@@ -61,10 +61,26 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn clean install -DskipTests
 
+      - name: build pulsar image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
+      - name: build pulsar-all image
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
+
       - name: build artifacts and docker image
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
+        run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true
 
-      - name: run integration tests
+      - name: run integration messaging tests
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-messaging.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+      - name: run integration proxy tests
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests -DredirectTestOutputToFile=false
+
+      - name: run integration proxy with WebSocket tests
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests -DredirectTestOutputToFile=false
diff --git a/conf/proxy.conf b/conf/proxy.conf
index cbffe0d..a257184 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -209,6 +209,14 @@ tokenAudienceClaim=
 # The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
 tokenAudience=
 
+### --- WebSocket config variables --- ###
+
+# Enable or disable the WebSocket servlet.
+webSocketServiceEnabled=false
+
+# Name of the cluster to which this broker belongs to
+clusterName=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 6580691..3a48f5d 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -56,6 +56,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-websocket</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e42ec93..810aa1b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -70,6 +70,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
     @Category
     private static final String CATEGORY_PLUGIN = "proxy plugin";
+    @Category
+    private static final String CATEGORY_WEBSOCKET = "WebSocket";
 
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
@@ -522,6 +524,20 @@ public class ProxyConfiguration implements PulsarConfiguration {
             )
         }
     )
+
+    /***** --- WebSocket --- ****/
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Enable or disable the WebSocket servlet"
+    )
+    private boolean webSocketServiceEnabled = false;
+
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Name of the cluster to which this broker belongs to"
+    )
+    private String clusterName;
+
     private Properties properties = new Properties();
 
     public Properties getProperties() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 3a30b47..c1178b0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -29,8 +29,13 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
+import org.apache.pulsar.websocket.WebSocketConsumerServlet;
+import org.apache.pulsar.websocket.WebSocketProducerServlet;
+import org.apache.pulsar.websocket.WebSocketReaderServlet;
+import org.apache.pulsar.websocket.WebSocketService;
 import org.eclipse.jetty.proxy.ProxyServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -184,7 +189,7 @@ public class ProxyServiceStarter {
     public static void addWebServerHandlers(WebServer server,
                                      ProxyConfiguration config,
                                      ProxyService service,
-                                     BrokerDiscoveryProvider discoveryProvider) {
+                                     BrokerDiscoveryProvider discoveryProvider) throws Exception {
         server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
         server.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
@@ -215,6 +220,30 @@ public class ProxyServiceStarter {
                 log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
             }
         }
+
+        if (config.isWebSocketServiceEnabled()) {
+            // add WebSocket servlet
+            // Use local broker address to avoid different IP address when using a VIP for service discovery
+            WebSocketService webSocketService = new WebSocketService(null, PulsarConfigurationLoader.convertFrom(config));
+            webSocketService.start();
+            final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
+            server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
+                    new ServletHolder(producerWebSocketServlet));
+            server.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
+                    new ServletHolder(producerWebSocketServlet));
+
+            final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
+            server.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
+                    new ServletHolder(consumerWebSocketServlet));
+            server.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
+                    new ServletHolder(consumerWebSocketServlet));
+
+            final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
+            server.addServlet(WebSocketReaderServlet.SERVLET_PATH,
+                    new ServletHolder(readerWebSocketServlet));
+            server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
+                    new ServletHolder(readerWebSocketServlet));
+        }
     }
 
     private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 20dce5b..6daedaf 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -170,6 +170,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
index 10af298..e0f95a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.tests.integration.containers.ProxyContainer;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.testng.annotations.Test;
@@ -44,19 +43,11 @@ import org.slf4j.LoggerFactory;
  */
 public class TestProxy extends PulsarTestSuite {
     private final static Logger log = LoggerFactory.getLogger(TestProxy.class);
-    private ProxyContainer proxyViaURL;
 
     @Override
     protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
             String clusterName,
             PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
-        proxyViaURL = new ProxyContainer(clusterName, "proxy-via-url")
-            .withEnv("brokerServiceURL", "pulsar://pulsar-broker-0:6650")
-            .withEnv("brokerWebServiceURL", "http://pulsar-broker-0:8080")
-            .withEnv("clusterName", clusterName);
-
-        specBuilder.externalService("proxy-via-url", proxyViaURL);
-
         return super.beforeSetupCluster(clusterName, specBuilder);
     }
 
@@ -107,7 +98,7 @@ public class TestProxy extends PulsarTestSuite {
 
     @Test
     public void testProxyWithNoServiceDiscoveryProxyConnectsViaURL() throws Exception {
-        testProxy(proxyViaURL.getPlainTextServiceUrl(), proxyViaURL.getHttpServiceUrl());
+        testProxy(pulsarCluster.getProxy().getPlainTextServiceUrl(), pulsarCluster.getProxy().getHttpServiceUrl());
     }
 
     @Test
@@ -119,7 +110,7 @@ public class TestProxy extends PulsarTestSuite {
 
         @Cleanup
         PulsarAdmin admin = PulsarAdmin.builder()
-                .serviceHttpUrl(pulsarCluster.getPlainTextServiceUrl())
+                .serviceHttpUrl(pulsarCluster.getProxy().getHttpServiceUrl())
                 .build();
 
         admin.tenants().createTenant(tenant,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java
new file mode 100644
index 0000000..5847901
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxyWithWebSocket.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.proxy;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
+
+/**
+ * Test cases for proxy.
+ */
+public class TestProxyWithWebSocket extends PulsarTestSuite {
+    private final static Logger log = LoggerFactory.getLogger(TestProxyWithWebSocket.class);
+
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        Map<String, String> envs = new HashMap<>();
+        envs.put("webSocketServiceEnabled", "true");
+        specBuilder.proxyEnvs(envs);
+        return super.beforeSetupCluster(clusterName, specBuilder);
+    }
+
+    @Test
+    public void testWebSocket() throws Exception {
+
+        final String tenant = "proxy-test-" + randomName(10);
+        final String namespace = tenant + "/ns1";
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .build();
+
+        admin.tenants().createTenant(tenant,
+                new TenantInfo(Collections.emptySet(), Collections.singleton(pulsarCluster.getClusterName())));
+
+        admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
+
+        HttpClient httpClient = new HttpClient();
+        WebSocketClient webSocketClient = new WebSocketClient(httpClient);
+        webSocketClient.start();
+        MyWebSocket myWebSocket = new MyWebSocket();
+        String webSocketUri = pulsarCluster.getProxy().getHttpServiceUrl().replaceFirst("http", "ws")
+                + "/ws/v2/producer/persistent/" + namespace + "/my-topic";
+        Future<Session> sessionFuture =  webSocketClient.connect(myWebSocket,
+                URI.create(webSocketUri));
+        sessionFuture.get().getRemote().sendString("{\n" +
+                "  \"payload\": \"SGVsbG8gV29ybGQ=\",\n" +
+                "  \"properties\": {\"key1\": \"value1\", \"key2\": \"value2\"},\n" +
+                "  \"context\": \"1\"\n" +
+                "}");
+
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            String response = myWebSocket.getResponse();
+            Assert.assertNotNull(response);
+            Assert.assertTrue(response.contains("ok"));
+        });
+    }
+
+    @WebSocket
+    public static class MyWebSocket implements WebSocketListener {
+        Queue<String> incomingMessages = new ArrayBlockingQueue<>(10);
+        @Override
+        public void onWebSocketBinary(byte[] bytes, int i, int i1) {
+        }
+
+        @Override
+        public void onWebSocketText(String s) {
+            incomingMessages.add(s);
+        }
+
+        @Override
+        public void onWebSocketClose(int i, String s) {
+        }
+
+        @Override
+        public void onWebSocketConnect(Session session) {
+
+        }
+
+        @Override
+        public void onWebSocketError(Throwable throwable) {
+
+        }
+
+        public String getResponse() {
+            return incomingMessages.poll();
+        }
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index d655545..8f40793 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -132,6 +132,9 @@ public class PulsarCluster {
             .withEnv("zookeeperServers", ZKContainer.NAME)
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
             .withEnv("clusterName", clusterName);
+        if (spec.proxyEnvs != null) {
+            spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
+        }
 
         // create bookies
         bookieContainers.putAll(
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index e1e26ef..63530dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -126,4 +126,9 @@ public class PulsarClusterSpec {
      */
     @Default
     String pulsarTestImage = PulsarContainer.DEFAULT_IMAGE_NAME;
+
+    /**
+     * Specify envs for proxy.
+     */
+    Map<String, String> proxyEnvs;
 }
diff --git a/tests/integration/src/test/resources/pulsar-proxy-websocket.xml b/tests/integration/src/test/resources/pulsar-proxy-websocket.xml
new file mode 100644
index 0000000..6e32ac9
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-proxy-websocket.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Proxy with WebSocket) Integration Tests" verbose="2" annotations="JDK">
+    <test name="messaging-test-suite" preserve-order="true">
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.proxy.TestProxyWithWebSocket" />
+        </classes>
+    </test>
+</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-proxy.xml b/tests/integration/src/test/resources/pulsar-proxy.xml
new file mode 100644
index 0000000..ae6f138
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-proxy.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar (Proxy) Integration Tests" verbose="2" annotations="JDK">
+    <test name="messaging-test-suite" preserve-order="true">
+        <classes>
+            <class name="org.apache.pulsar.tests.integration.proxy.TestProxy" />
+        </classes>
+    </test>
+</suite>
\ No newline at end of file