You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/31 17:37:02 UTC

[incubator-pulsar] branch master updated: Fix REST APIs provided by Pulsar proxy (#1862)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 13ad7a6  Fix REST APIs provided by Pulsar proxy (#1862)
13ad7a6 is described below

commit 13ad7a60321468ec5290f013cb672c73f87a6b6c
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Fri Jun 1 02:36:52 2018 +0900

    Fix REST APIs provided by Pulsar proxy (#1862)
    
    * Fix REST APIs provided by Pulsar proxy
    
    * Rename status file for test
    
    * Exclude status file from license check
    
    * Add license header to status file
    
    * Proxying HTTP request in the same way as Pulsar wire protocol
---
 .../pulsar/proxy/server/AdminProxyHandler.java     | 63 +++++++++++++++++++++-
 .../pulsar/proxy/server/ProxyConfiguration.java    | 20 +++++++
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 15 ++++--
 .../org/apache/pulsar/proxy/server/WebServer.java  |  9 ++--
 .../proxy/server/AuthedAdminProxyHandlerTest.java  | 30 +++++++----
 .../server/UnauthedAdminProxyHandlerTest.java      | 55 ++++++++++++++-----
 pulsar-proxy/src/test/resources/vip_status.html    | 20 +++++++
 7 files changed, 177 insertions(+), 35 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index dce9126..deb17d8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -18,29 +18,41 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
 import java.io.IOException;
+import java.net.URI;
 import java.security.cert.X509Certificate;
 import java.util.Objects;
+
 import javax.net.ssl.SSLContext;
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.proxy.AsyncProxyServlet;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class AdminProxyHandler extends AsyncProxyServlet.Transparent {
+class AdminProxyHandler extends AsyncProxyServlet {
     private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
 
     private final ProxyConfiguration config;
+    private final BrokerDiscoveryProvider discoveryProvider;
+    private final String brokerWebServiceUrl;
 
-    AdminProxyHandler(ProxyConfiguration config) {
+    AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
         this.config = config;
+        this.discoveryProvider = discoveryProvider;
+        this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
+                : config.getBrokerWebServiceURL();
     }
 
     @Override
@@ -103,4 +115,51 @@ class AdminProxyHandler extends AsyncProxyServlet.Transparent {
         // return an unauthenticated client, every request will fail.
         return new HttpClient();
     }
+
+    @Override
+    protected String rewriteTarget(HttpServletRequest request) {
+        StringBuilder url = new StringBuilder();
+
+        if (isBlank(brokerWebServiceUrl)) {
+            try {
+                ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+
+                if (config.isTlsEnabledWithBroker()) {
+                    url.append(availableBroker.getWebServiceUrlTls());
+                } else {
+                    url.append(availableBroker.getWebServiceUrl());
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}:{}] Selected active broker is {}", request.getRemoteAddr(), request.getRemotePort(),
+                            url.toString());
+                }
+            } catch (Exception e) {
+                LOG.warn("[{}:{}] Failed to get next active broker {}", request.getRemoteAddr(),
+                        request.getRemotePort(), e.getMessage(), e);
+                return null;
+            }
+        } else {
+            url.append(brokerWebServiceUrl);
+        }
+
+        if (url.lastIndexOf("/") == url.length() - 1) {
+            url.deleteCharAt(url.lastIndexOf("/"));
+        }
+        url.append(request.getRequestURI());
+
+        String query = request.getQueryString();
+        if (query != null) {
+            url.append("?").append(query);
+        }
+
+        URI rewrittenUrl = URI.create(url.toString()).normalize();
+
+        if (!validateDestination(rewrittenUrl.getHost(), rewrittenUrl.getPort())) {
+            return null;
+        }
+
+        return rewrittenUrl.toString();
+    }
+
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index dcd9580..4d02786 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -44,6 +44,10 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private String brokerServiceURL;
     private String brokerServiceURLTLS;
 
+    // These settings are unnecessary if `zookeeperServers` is specified
+    private String brokerWebServiceURL;
+    private String brokerWebServiceURLTLS;
+
     // Port to use to server binary-proto request
     private int servicePort = 6650;
     // Port to use to server binary-proto-tls request
@@ -138,6 +142,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
         this.brokerServiceURL = discoveryServiceURL;
     }
 
+    public String getBrokerWebServiceURL() {
+        return brokerWebServiceURL;
+    }
+
+    public void setBrokerWebServiceURL(String brokerWebServiceURL) {
+        this.brokerWebServiceURL = brokerWebServiceURL;
+    }
+
+    public String getBrokerWebServiceURLTLS() {
+        return brokerWebServiceURLTLS;
+    }
+
+    public void setBrokerWebServiceURLTLS(String brokerWebServiceURLTLS) {
+        this.brokerWebServiceURLTLS = brokerWebServiceURLTLS;
+    }
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 77fcd77..75adb16 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -105,7 +105,13 @@ public class ProxyServiceStarter {
         if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
                 || config.isAuthorizationEnabled()) {
             checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
-            checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configurationStoreServers must be provided");
+            checkArgument(!isEmpty(config.getConfigurationStoreServers()),
+                    "configurationStoreServers must be provided");
+        }
+
+        if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
+                || (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
+            checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
         }
 
         java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
@@ -132,12 +138,11 @@ public class ProxyServiceStarter {
         server.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
 
-        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config);
+        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, proxyService.getDiscoveryProvider());
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         servletHolder.setInitParameter("preserveHost", "true");
-        servletHolder.setInitParameter("proxyTo", config.getBrokerServiceURL());
-        server.addServlet("/admin/*", servletHolder);
-        server.addServlet("/lookup/*", servletHolder);
+        server.addServlet("/admin", servletHolder);
+        server.addServlet("/lookup", servletHolder);
 
         // start web-service
         server.start();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index f7a4f28..e840b42 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -100,13 +100,14 @@ public class WebServer {
         return this.server.getURI();
     }
 
-    public void addServlet(String path, ServletHolder servletHolder) {
-        addServlet(path, servletHolder, Collections.emptyList());
+    public void addServlet(String basePath, ServletHolder servletHolder) {
+        addServlet(basePath, servletHolder, Collections.emptyList());
     }
 
-    public void addServlet(String path, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
+    public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
         ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.addServlet(servletHolder, path);
+        context.setContextPath(basePath);
+        context.addServlet(servletHolder, "/*");
         for (Pair<String, Object> attribute : attributes) {
             context.setAttribute(attribute.getLeft(), attribute.getRight());
         }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index ec217cf..4d32a58 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -19,18 +19,23 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import javax.servlet.http.HttpServletRequest;
+
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -50,6 +55,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
     private final String configClusterName = "test";
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
     private WebServer webServer;
+    private BrokerDiscoveryProvider discoveryProvider;
     private AdminProxyWrapper adminProxyHandler;
 
     @BeforeMethod
@@ -109,12 +115,15 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
 
         webServer = new WebServer(proxyConfig);
 
-        adminProxyHandler = new AdminProxyWrapper(proxyConfig);
+        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
+        LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
+        doReturn(report).when(discoveryProvider).nextBroker();
+
+        adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         servletHolder.setInitParameter("preserveHost", "true");
-        servletHolder.setInitParameter("proxyTo", brokerUrlTls.toString());
-        webServer.addServlet("/admin/*", servletHolder);
-        webServer.addServlet("/lookup/*", servletHolder);
+        webServer.addServlet("/admin", servletHolder);
+        webServer.addServlet("/lookup", servletHolder);
 
         // start web-service
         webServer.start();
@@ -142,20 +151,21 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
 
         List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName);
         Assert.assertEquals(activeBrokers.size(), 1);
-        Assert.assertTrue(adminProxyHandler.rewriteCalled);
+        Assert.assertEquals(adminProxyHandler.rewrittenUrl, String.format("%s/admin/v2/brokers/%s",
+                brokerUrlTls.toString(), configClusterName));
     }
 
     static class AdminProxyWrapper extends AdminProxyHandler {
-        boolean rewriteCalled = false;
+        String rewrittenUrl;
 
-        AdminProxyWrapper(ProxyConfiguration config) {
-            super(config);
+        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
+            super(config, discoveryProvider);
         }
 
         @Override
         protected String rewriteTarget(HttpServletRequest clientRequest) {
-            rewriteCalled = true;
-            return super.rewriteTarget(clientRequest);
+            rewrittenUrl = super.rewriteTarget(clientRequest);
+            return rewrittenUrl;
         }
 
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 2c90d10..5e099ce 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -18,15 +18,26 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.AssertTrue;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.configuration.VipStatus;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -34,9 +45,11 @@ import org.testng.annotations.Test;
 
 public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
     private final String DUMMY_VALUE = "DUMMY_VALUE";
+    private final String STATUS_FILE_PATH = "./src/test/resources/vip_status.html";
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
-    private AdminProxyWrapper adminProxyHandler;
     private WebServer webServer;
+    private BrokerDiscoveryProvider discoveryProvider;
+    private AdminProxyWrapper adminProxyHandler;
 
     @Override
     @BeforeClass
@@ -54,19 +67,22 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         // start proxy service
         proxyConfig.setServicePort(PortManager.nextFreePort());
         proxyConfig.setWebServicePort(PortManager.nextFreePort());
-        proxyConfig.setBrokerServiceURL(brokerUrl.toString());
-
+        proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
+        proxyConfig.setStatusFilePath(STATUS_FILE_PATH);
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
 
         webServer = new WebServer(proxyConfig);
 
-        adminProxyHandler = new AdminProxyWrapper(proxyConfig);
+        discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
+        adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
         servletHolder.setInitParameter("preserveHost", "true");
-        servletHolder.setInitParameter("proxyTo", brokerUrl.toString());
-        webServer.addServlet("/admin/*", servletHolder);
-        webServer.addServlet("/lookup/*", servletHolder);
+        webServer.addServlet("/admin", servletHolder);
+        webServer.addServlet("/lookup", servletHolder);
+
+        webServer.addRestResources("/", VipStatus.class.getPackage().getName(),
+                VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath());
 
         // start web-service
         webServer.start();
@@ -87,22 +103,33 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
             .build();
         List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName);
         Assert.assertEquals(activeBrokers.size(), 1);
-        Assert.assertTrue(adminProxyHandler.rewriteCalled);
+        Assert.assertEquals(adminProxyHandler.rewrittenUrl, String.format("%s/admin/v2/brokers/%s",
+                brokerUrl.toString(), configClusterName));
+    }
+
+    @Test
+    public void testVipStatus() throws Exception {
+        Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
+        WebTarget webTarget = client.target("http://127.0.0.1:" + proxyConfig.getWebServicePort())
+                .path("/status.html");
+        String response = webTarget.request().get(String.class);
+        Assert.assertEquals(response, "OK");
+        client.close();
     }
 
     static class AdminProxyWrapper extends AdminProxyHandler {
-        boolean rewriteCalled = false;
+        String rewrittenUrl;
 
-        AdminProxyWrapper(ProxyConfiguration config) {
-            super(config);
+        AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
+            super(config, discoveryProvider);
         }
 
         @Override
         protected String rewriteTarget(HttpServletRequest clientRequest) {
-            rewriteCalled = true;
-            return super.rewriteTarget(clientRequest);
+            rewrittenUrl = super.rewriteTarget(clientRequest);
+            return rewrittenUrl;
         }
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-proxy/src/test/resources/vip_status.html b/pulsar-proxy/src/test/resources/vip_status.html
new file mode 100644
index 0000000..a0eb9fb
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/vip_status.html
@@ -0,0 +1,20 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.