You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/31 17:37:02 UTC
[incubator-pulsar] branch master updated: Fix REST APIs provided by
Pulsar proxy (#1862)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 13ad7a6 Fix REST APIs provided by Pulsar proxy (#1862)
13ad7a6 is described below
commit 13ad7a60321468ec5290f013cb672c73f87a6b6c
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Fri Jun 1 02:36:52 2018 +0900
Fix REST APIs provided by Pulsar proxy (#1862)
* Fix REST APIs provided by Pulsar proxy
* Rename status file for test
* Exclude status file from license check
* Add license header to status file
* Proxying HTTP request in the same way as Pulsar wire protocol
---
.../pulsar/proxy/server/AdminProxyHandler.java | 63 +++++++++++++++++++++-
.../pulsar/proxy/server/ProxyConfiguration.java | 20 +++++++
.../pulsar/proxy/server/ProxyServiceStarter.java | 15 ++++--
.../org/apache/pulsar/proxy/server/WebServer.java | 9 ++--
.../proxy/server/AuthedAdminProxyHandlerTest.java | 30 +++++++----
.../server/UnauthedAdminProxyHandlerTest.java | 55 ++++++++++++++-----
pulsar-proxy/src/test/resources/vip_status.html | 20 +++++++
7 files changed, 177 insertions(+), 35 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index dce9126..deb17d8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -18,29 +18,41 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
import java.io.IOException;
+import java.net.URI;
import java.security.cert.X509Certificate;
import java.util.Objects;
+
import javax.net.ssl.SSLContext;
import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class AdminProxyHandler extends AsyncProxyServlet.Transparent {
+class AdminProxyHandler extends AsyncProxyServlet {
private static final Logger LOG = LoggerFactory.getLogger(AdminProxyHandler.class);
private final ProxyConfiguration config;
+ private final BrokerDiscoveryProvider discoveryProvider;
+ private final String brokerWebServiceUrl;
- AdminProxyHandler(ProxyConfiguration config) {
+ AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
this.config = config;
+ this.discoveryProvider = discoveryProvider;
+ this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
+ : config.getBrokerWebServiceURL();
}
@Override
@@ -103,4 +115,51 @@ class AdminProxyHandler extends AsyncProxyServlet.Transparent {
// return an unauthenticated client, every request will fail.
return new HttpClient();
}
+
+ @Override
+ protected String rewriteTarget(HttpServletRequest request) {
+ StringBuilder url = new StringBuilder();
+
+ if (isBlank(brokerWebServiceUrl)) {
+ try {
+ ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+
+ if (config.isTlsEnabledWithBroker()) {
+ url.append(availableBroker.getWebServiceUrlTls());
+ } else {
+ url.append(availableBroker.getWebServiceUrl());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}:{}] Selected active broker is {}", request.getRemoteAddr(), request.getRemotePort(),
+ url.toString());
+ }
+ } catch (Exception e) {
+ LOG.warn("[{}:{}] Failed to get next active broker {}", request.getRemoteAddr(),
+ request.getRemotePort(), e.getMessage(), e);
+ return null;
+ }
+ } else {
+ url.append(brokerWebServiceUrl);
+ }
+
+ if (url.lastIndexOf("/") == url.length() - 1) {
+ url.deleteCharAt(url.lastIndexOf("/"));
+ }
+ url.append(request.getRequestURI());
+
+ String query = request.getQueryString();
+ if (query != null) {
+ url.append("?").append(query);
+ }
+
+ URI rewrittenUrl = URI.create(url.toString()).normalize();
+
+ if (!validateDestination(rewrittenUrl.getHost(), rewrittenUrl.getPort())) {
+ return null;
+ }
+
+ return rewrittenUrl.toString();
+ }
+
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index dcd9580..4d02786 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -44,6 +44,10 @@ public class ProxyConfiguration implements PulsarConfiguration {
private String brokerServiceURL;
private String brokerServiceURLTLS;
+ // These settings are unnecessary if `zookeeperServers` is specified
+ private String brokerWebServiceURL;
+ private String brokerWebServiceURLTLS;
+
// Port to use to server binary-proto request
private int servicePort = 6650;
// Port to use to server binary-proto-tls request
@@ -138,6 +142,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
this.brokerServiceURL = discoveryServiceURL;
}
+ public String getBrokerWebServiceURL() {
+ return brokerWebServiceURL;
+ }
+
+ public void setBrokerWebServiceURL(String brokerWebServiceURL) {
+ this.brokerWebServiceURL = brokerWebServiceURL;
+ }
+
+ public String getBrokerWebServiceURLTLS() {
+ return brokerWebServiceURLTLS;
+ }
+
+ public void setBrokerWebServiceURLTLS(String brokerWebServiceURLTLS) {
+ this.brokerWebServiceURLTLS = brokerWebServiceURLTLS;
+ }
+
public String getZookeeperServers() {
return zookeeperServers;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 77fcd77..75adb16 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -105,7 +105,13 @@ public class ProxyServiceStarter {
if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
|| config.isAuthorizationEnabled()) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
- checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configurationStoreServers must be provided");
+ checkArgument(!isEmpty(config.getConfigurationStoreServers()),
+ "configurationStoreServers must be provided");
+ }
+
+ if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
+ || (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
+ checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
}
java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
@@ -132,12 +138,11 @@ public class ProxyServiceStarter {
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
- AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config);
+ AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, proxyService.getDiscoveryProvider());
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
- servletHolder.setInitParameter("proxyTo", config.getBrokerServiceURL());
- server.addServlet("/admin/*", servletHolder);
- server.addServlet("/lookup/*", servletHolder);
+ server.addServlet("/admin", servletHolder);
+ server.addServlet("/lookup", servletHolder);
// start web-service
server.start();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index f7a4f28..e840b42 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -100,13 +100,14 @@ public class WebServer {
return this.server.getURI();
}
- public void addServlet(String path, ServletHolder servletHolder) {
- addServlet(path, servletHolder, Collections.emptyList());
+ public void addServlet(String basePath, ServletHolder servletHolder) {
+ addServlet(basePath, servletHolder, Collections.emptyList());
}
- public void addServlet(String path, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
+ public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<String, Object>> attributes) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.addServlet(servletHolder, path);
+ context.setContextPath(basePath);
+ context.addServlet(servletHolder, "/*");
for (Pair<String, Object> attribute : attributes) {
context.setAttribute(attribute.getLeft(), attribute.getRight());
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index ec217cf..4d32a58 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -19,18 +19,23 @@
package org.apache.pulsar.proxy.server;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.servlet.http.HttpServletRequest;
+
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.eclipse.jetty.servlet.ServletHolder;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -50,6 +55,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
private final String configClusterName = "test";
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private WebServer webServer;
+ private BrokerDiscoveryProvider discoveryProvider;
private AdminProxyWrapper adminProxyHandler;
@BeforeMethod
@@ -109,12 +115,15 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
webServer = new WebServer(proxyConfig);
- adminProxyHandler = new AdminProxyWrapper(proxyConfig);
+ discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
+ LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
+ doReturn(report).when(discoveryProvider).nextBroker();
+
+ adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
- servletHolder.setInitParameter("proxyTo", brokerUrlTls.toString());
- webServer.addServlet("/admin/*", servletHolder);
- webServer.addServlet("/lookup/*", servletHolder);
+ webServer.addServlet("/admin", servletHolder);
+ webServer.addServlet("/lookup", servletHolder);
// start web-service
webServer.start();
@@ -142,20 +151,21 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName);
Assert.assertEquals(activeBrokers.size(), 1);
- Assert.assertTrue(adminProxyHandler.rewriteCalled);
+ Assert.assertEquals(adminProxyHandler.rewrittenUrl, String.format("%s/admin/v2/brokers/%s",
+ brokerUrlTls.toString(), configClusterName));
}
static class AdminProxyWrapper extends AdminProxyHandler {
- boolean rewriteCalled = false;
+ String rewrittenUrl;
- AdminProxyWrapper(ProxyConfiguration config) {
- super(config);
+ AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
+ super(config, discoveryProvider);
}
@Override
protected String rewriteTarget(HttpServletRequest clientRequest) {
- rewriteCalled = true;
- return super.rewriteTarget(clientRequest);
+ rewrittenUrl = super.rewriteTarget(clientRequest);
+ return rewrittenUrl;
}
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 2c90d10..5e099ce 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -18,15 +18,26 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.AssertTrue;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.configuration.VipStatus;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -34,9 +45,11 @@ import org.testng.annotations.Test;
public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
private final String DUMMY_VALUE = "DUMMY_VALUE";
+ private final String STATUS_FILE_PATH = "./src/test/resources/vip_status.html";
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
- private AdminProxyWrapper adminProxyHandler;
private WebServer webServer;
+ private BrokerDiscoveryProvider discoveryProvider;
+ private AdminProxyWrapper adminProxyHandler;
@Override
@BeforeClass
@@ -54,19 +67,22 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
// start proxy service
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
- proxyConfig.setBrokerServiceURL(brokerUrl.toString());
-
+ proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
+ proxyConfig.setStatusFilePath(STATUS_FILE_PATH);
proxyConfig.setZookeeperServers(DUMMY_VALUE);
proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
webServer = new WebServer(proxyConfig);
- adminProxyHandler = new AdminProxyWrapper(proxyConfig);
+ discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, mockZooKeeperClientFactory));
+ adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
- servletHolder.setInitParameter("proxyTo", brokerUrl.toString());
- webServer.addServlet("/admin/*", servletHolder);
- webServer.addServlet("/lookup/*", servletHolder);
+ webServer.addServlet("/admin", servletHolder);
+ webServer.addServlet("/lookup", servletHolder);
+
+ webServer.addRestResources("/", VipStatus.class.getPackage().getName(),
+ VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath());
// start web-service
webServer.start();
@@ -87,22 +103,33 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
.build();
List<String> activeBrokers = admin.brokers().getActiveBrokers(configClusterName);
Assert.assertEquals(activeBrokers.size(), 1);
- Assert.assertTrue(adminProxyHandler.rewriteCalled);
+ Assert.assertEquals(adminProxyHandler.rewrittenUrl, String.format("%s/admin/v2/brokers/%s",
+ brokerUrl.toString(), configClusterName));
+ }
+
+ @Test
+ public void testVipStatus() throws Exception {
+ Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
+ WebTarget webTarget = client.target("http://127.0.0.1:" + proxyConfig.getWebServicePort())
+ .path("/status.html");
+ String response = webTarget.request().get(String.class);
+ Assert.assertEquals(response, "OK");
+ client.close();
}
static class AdminProxyWrapper extends AdminProxyHandler {
- boolean rewriteCalled = false;
+ String rewrittenUrl;
- AdminProxyWrapper(ProxyConfiguration config) {
- super(config);
+ AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
+ super(config, discoveryProvider);
}
@Override
protected String rewriteTarget(HttpServletRequest clientRequest) {
- rewriteCalled = true;
- return super.rewriteTarget(clientRequest);
+ rewrittenUrl = super.rewriteTarget(clientRequest);
+ return rewrittenUrl;
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-proxy/src/test/resources/vip_status.html b/pulsar-proxy/src/test/resources/vip_status.html
new file mode 100644
index 0000000..a0eb9fb
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/vip_status.html
@@ -0,0 +1,20 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.