You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 07:59:33 UTC
[pulsar] branch master updated: Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 94392a41fc8 Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)
94392a41fc8 is described below
commit 94392a41fc8a6f7b431785bdc9701250b43c8464
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Jun 6 10:59:25 2022 +0300
Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)
- There were filter instances for each context path which made
maxConcurrentHttpRequests and httpRequestsMaxPerSecond not work as expected.
- Fixes the backpressure solution that is dependent on maxConcurrentHttpRequests and httpRequestsMaxPerSecond
working properly
- Fix invalid Jersey api usage
---
.../java/org/apache/pulsar/broker/web/Filters.java | 60 ----------
.../org/apache/pulsar/broker/PulsarService.java | 24 ++--
.../org/apache/pulsar/broker/web/WebService.java | 127 ++++++++++++++-------
.../pulsar/functions/worker/rest/WorkerServer.java | 79 ++++++++-----
.../pulsar/proxy/server/ProxyServiceStarter.java | 6 +-
.../org/apache/pulsar/proxy/server/WebServer.java | 64 +++++++----
.../server/UnauthedAdminProxyHandlerTest.java | 4 +-
.../pulsar/websocket/service/ProxyServer.java | 24 ++--
.../websocket/service/WebSocketServiceStarter.java | 10 +-
9 files changed, 216 insertions(+), 182 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
deleted file mode 100644
index 3b6bb721bcc..00000000000
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.web;
-
-import java.util.EnumSet;
-import java.util.Map;
-import javax.servlet.DispatcherType;
-import javax.servlet.Filter;
-import org.eclipse.jetty.servlet.FilterHolder;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-
-public class Filters {
- private static final String MATCH_ALL = "/*";
-
- /**
- * Adds a filter instance to the servlet context handler.
- * The filter will be used for all requests.
- *
- * @param context servlet context handler instance
- * @param filter filter instance
- */
- public static void addFilter(ServletContextHandler context, Filter filter) {
- addFilterHolder(context, new FilterHolder(filter));
- }
-
- private static void addFilterHolder(ServletContextHandler context, FilterHolder filter) {
- context.addFilter(filter,
- MATCH_ALL, EnumSet.allOf(DispatcherType.class));
- }
-
- /**
- * Adds a filter to the servlet context handler which gets instantiated and configured when the server starts.
- *
- * @param context servlet context handler instance
- * @param filter filter class
- * @param initParams initialization parameters used for configuring the filter instance
- */
- public static void addFilterClass(ServletContextHandler context, Class<? extends Filter> filter,
- Map<String, String> initParams) {
- FilterHolder holder = new FilterHolder(filter);
- holder.setInitParameters(initParams);
- addFilterHolder(context, holder);
- }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9cc2878df0a..22368aab521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
@@ -98,6 +99,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
@@ -882,21 +884,21 @@ public class PulsarService implements AutoCloseable, ShutdownService {
// Ensure the VIP status is only visible when the broker is fully initialized
return state == State.Started;
});
+
// Add admin rest resources
- webService.addRestResources("/",
- VipStatus.class.getPackage().getName(), false, vipAttributeMap);
- webService.addRestResources("/",
- "org.apache.pulsar.broker.web", false, attributeMap);
+ webService.addRestResource("/",
+ false, vipAttributeMap, VipStatus.class);
webService.addRestResources("/admin",
- "org.apache.pulsar.broker.admin.v1", true, attributeMap);
+ true, attributeMap, "org.apache.pulsar.broker.admin.v1");
webService.addRestResources("/admin/v2",
- "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+ true, attributeMap, "org.apache.pulsar.broker.admin.v2");
webService.addRestResources("/admin/v3",
- "org.apache.pulsar.broker.admin.v3", true, attributeMap);
- webService.addRestResources("/lookup",
- "org.apache.pulsar.broker.lookup", true, attributeMap);
- webService.addRestResources("/topics",
- "org.apache.pulsar.broker.rest", true, attributeMap);
+ true, attributeMap, "org.apache.pulsar.broker.admin.v3");
+ webService.addRestResource("/lookup",
+ true, attributeMap, TopicLookup.class,
+ org.apache.pulsar.broker.lookup.v2.TopicLookup.class);
+ webService.addRestResource("/topics",
+ true, attributeMap, Topics.class);
// Add metrics servlet
webService.addServlet("/metrics",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 8c965b8e496..0117ae31f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -18,16 +18,15 @@
*/
package org.apache.pulsar.broker.web;
-import static org.apache.pulsar.broker.web.Filters.addFilter;
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import com.google.common.collect.Lists;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import javax.servlet.DispatcherType;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -43,6 +42,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.QoSFilter;
@@ -72,6 +72,7 @@ public class WebService implements AutoCloseable {
private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
+ private final FilterInitializer filterInitializer;
private JettyStatisticsCollector jettyStatisticsCollector;
public WebService(PulsarService pulsar) throws PulsarServerException {
@@ -144,12 +145,30 @@ public class WebService implements AutoCloseable {
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+ filterInitializer = new FilterInitializer(pulsar);
+ }
+
+ public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+ String... javaPackages) {
+ ResourceConfig config = new ResourceConfig();
+ for (String javaPackage : javaPackages) {
+ config.packages(false, javaPackage);
+ }
+ addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
}
- public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication,
- Map<String, Object> attributeMap) {
+ public void addRestResource(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+ Class<?>... resourceClasses) {
ResourceConfig config = new ResourceConfig();
- config.packages("jersey.config.server.provider.packages", javaPackages);
+ for (Class<?> resourceClass : resourceClasses) {
+ config.register(resourceClass);
+ }
+ addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
+ }
+
+ private void addResourceServlet(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+ ResourceConfig config) {
config.register(JsonMapperProvider.class);
config.register(MultiPartFeature.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
@@ -157,53 +176,75 @@ public class WebService implements AutoCloseable {
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
}
- public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
- Map<String, Object> attributeMap) {
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath(path);
- context.addServlet(servletHolder, MATCH_ALL);
- if (attributeMap != null) {
- attributeMap.forEach((key, value) -> {
- context.setAttribute(key, value);
- });
- }
+ private static class FilterInitializer {
+ private final List<FilterHolder> filterHolders = new ArrayList<>();
+ private final FilterHolder authenticationFilterHolder;
+ FilterInitializer(PulsarService pulsarService) {
+ ServiceConfiguration config = pulsarService.getConfiguration();
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+ filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+ filterHolders.add(filterHolder);
+ }
- ServiceConfiguration config = pulsar.getConfig();
+ if (config.isHttpRequestsLimitEnabled()) {
+ filterHolders.add(new FilterHolder(
+ new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ }
- if (config.getMaxConcurrentHttpRequests() > 0) {
- addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
- String.valueOf(config.getMaxConcurrentHttpRequests())));
- }
+ if (!config.getBrokerInterceptors().isEmpty()
+ || !config.isDisableBrokerInterceptors()) {
+ ExceptionHandler handler = new ExceptionHandler();
+ // Enable PreInterceptFilter only when interceptors are enabled
+ filterHolders.add(
+ new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
+ filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService)));
+ }
- if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
- addFilter(context,
- new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
- }
+ if (config.isAuthenticationEnabled()) {
+ authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(
+ pulsarService.getBrokerService().getAuthenticationService()));
+ filterHolders.add(authenticationFilterHolder);
+ } else {
+ authenticationFilterHolder = null;
+ }
- if (!config.getBrokerInterceptors().isEmpty()
- || !config.isDisableBrokerInterceptors()) {
- ExceptionHandler handler = new ExceptionHandler();
- // Enable PreInterceptFilter only when interceptors are enabled
- addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
- addFilter(context, new ProcessHandlerFilter(pulsar));
- }
+ if (config.isDisableHttpDebugMethods()) {
+ filterHolders.add(new FilterHolder(new DisableDebugHttpMethodFilter(config)));
+ }
- if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
- addFilter(context, new AuthenticationFilter(
- pulsar.getBrokerService().getAuthenticationService()));
- }
+ if (config.getHttpMaxRequestSize() > 0) {
+ filterHolders.add(new FilterHolder(
+ new MaxRequestSizeFilter(
+ config.getHttpMaxRequestSize())));
+ }
- if (config.isDisableHttpDebugMethods()) {
- addFilter(context, new DisableDebugHttpMethodFilter(config));
+ filterHolders.add(new FilterHolder(new ResponseHandlerFilter(pulsarService)));
}
- if (config.getHttpMaxRequestSize() > 0) {
- addFilter(context,
- new MaxRequestSizeFilter(
- config.getHttpMaxRequestSize()));
+ public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+ for (FilterHolder filterHolder : filterHolders) {
+ if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+ context.addFilter(filterHolder,
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ }
+ }
}
- addFilter(context, new ResponseHandlerFilter(pulsar));
+ }
+
+ public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
+ Map<String, Object> attributeMap) {
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ // Notice: each context path should be unique, but there's nothing here to verify that
+ context.setContextPath(path);
+ context.addServlet(servletHolder, MATCH_ALL);
+ if (attributeMap != null) {
+ attributeMap.forEach((key, value) -> {
+ context.setAttribute(key, value);
+ });
+ }
+ filterInitializer.addFilters(context, requiresAuthentication);
handlers.add(context);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a2cc2c9b8cc..e913db0ae79 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,16 +18,15 @@
*/
package org.apache.pulsar.functions.worker.rest;
-import static org.apache.pulsar.broker.web.Filters.addFilter;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import javax.servlet.DispatcherType;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.broker.web.Filters;
import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -45,6 +44,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.QoSFilter;
@@ -65,12 +65,15 @@ public class WorkerServer {
private ServerConnector httpConnector;
private ServerConnector httpsConnector;
+ private final FilterInitializer filterInitializer;
+
public WorkerServer(WorkerService workerService, AuthenticationService authenticationService) {
this.workerConfig = workerService.getWorkerConfig();
this.workerService = workerService;
this.authenticationService = authenticationService;
this.webServerExecutor = new WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), "function-web",
this.workerConfig.getHttpServerThreadPoolQueueSize());
+ this.filterInitializer = new FilterInitializer(workerConfig, authenticationService);
init();
}
@@ -95,15 +98,15 @@ public class WorkerServer {
List<Handler> handlers = new ArrayList<>(4);
handlers.add(newServletContextHandler("/admin",
- new ResourceConfig(Resources.getApiV2Resources()), workerService, authenticationService));
+ new ResourceConfig(Resources.getApiV2Resources()), workerService, filterInitializer));
handlers.add(newServletContextHandler("/admin/v2",
- new ResourceConfig(Resources.getApiV2Resources()), workerService, authenticationService));
+ new ResourceConfig(Resources.getApiV2Resources()), workerService, filterInitializer));
handlers.add(newServletContextHandler("/admin/v3",
- new ResourceConfig(Resources.getApiV3Resources()), workerService, authenticationService));
+ new ResourceConfig(Resources.getApiV3Resources()), workerService, filterInitializer));
// don't require auth for metrics or config routes
handlers.add(newServletContextHandler("/",
new ResourceConfig(Resources.getRootResources()), workerService,
- workerConfig.isAuthenticateMetricsEndpoint(), authenticationService));
+ workerConfig.isAuthenticateMetricsEndpoint(), filterInitializer));
RequestLogHandler requestLogHandler = new RequestLogHandler();
requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
@@ -171,18 +174,52 @@ public class WorkerServer {
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
}
- public static ServletContextHandler newServletContextHandler(String contextPath,
+ private static class FilterInitializer {
+ private final List<FilterHolder> filterHolders = new ArrayList<>();
+ private final FilterHolder authenticationFilterHolder;
+
+ FilterInitializer(WorkerConfig config, AuthenticationService authenticationService) {
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+ filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+ filterHolders.add(filterHolder);
+ }
+
+ if (config.isHttpRequestsLimitEnabled()) {
+ filterHolders.add(new FilterHolder(
+ new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ }
+
+ if (config.isAuthenticationEnabled()) {
+ authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService));
+ filterHolders.add(authenticationFilterHolder);
+ } else {
+ authenticationFilterHolder = null;
+ }
+ }
+
+ public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+ for (FilterHolder filterHolder : filterHolders) {
+ if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+ context.addFilter(filterHolder,
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ }
+ }
+ }
+ }
+
+ static ServletContextHandler newServletContextHandler(String contextPath,
ResourceConfig config,
WorkerService workerService,
- AuthenticationService authenticationService) {
- return newServletContextHandler(contextPath, config, workerService, true, authenticationService);
+ FilterInitializer filterInitializer) {
+ return newServletContextHandler(contextPath, config, workerService, true, filterInitializer);
}
- public static ServletContextHandler newServletContextHandler(String contextPath,
+ static ServletContextHandler newServletContextHandler(String contextPath,
ResourceConfig config,
WorkerService workerService,
boolean requireAuthentication,
- AuthenticationService authenticationService) {
+ FilterInitializer filterInitializer) {
final ServletContextHandler contextHandler =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
@@ -195,27 +232,11 @@ public class WorkerServer {
new ServletHolder(new ServletContainer(config));
contextHandler.addServlet(apiServlet, MATCH_ALL);
- addQosFilterIfNeeded(contextHandler, workerService.getWorkerConfig());
-
- if (workerService.getWorkerConfig().isHttpRequestsLimitEnabled()) {
- addFilter(contextHandler,
- new RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond()));
- }
-
- if (workerService.getWorkerConfig().isAuthenticationEnabled() && requireAuthentication) {
- addFilter(contextHandler, new AuthenticationFilter(authenticationService));
- }
+ filterInitializer.addFilters(contextHandler, requireAuthentication);
return contextHandler;
}
- private static void addQosFilterIfNeeded(ServletContextHandler context, WorkerConfig workerConfig) {
- if (workerConfig.getMaxConcurrentHttpRequests() > 0) {
- Filters.addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
- String.valueOf(workerConfig.getMaxConcurrentHttpRequests())));
- }
- }
-
public void stop() {
if (this.server != null) {
try {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 6031850fc02..47ac70cc0cd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -260,10 +260,8 @@ public class ProxyServiceStarter {
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
}
}
- server.addRestResources("/", VipStatus.class.getPackage().getName(),
- VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
- server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
- ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
+ server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class);
+ server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index b2c3e88e1b0..193d0c7c93f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -18,16 +18,16 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.apache.pulsar.broker.web.Filters.addFilter;
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import javax.servlet.DispatcherType;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -48,6 +48,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.QoSFilter;
@@ -76,6 +77,8 @@ public class WebServer {
private ServerConnector connector;
private ServerConnector connectorTls;
+ private final FilterInitializer filterInitializer;
+
public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web",
config.getHttpServerThreadPoolQueueSize());
@@ -140,12 +143,48 @@ public class WebServer {
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+ filterInitializer = new FilterInitializer(config, authenticationService);
}
public URI getServiceUri() {
return serviceURI;
}
+ private static class FilterInitializer {
+ private final List<FilterHolder> filterHolders = new ArrayList<>();
+ private final FilterHolder authenticationFilterHolder;
+
+ FilterInitializer(ProxyConfiguration config, AuthenticationService authenticationService) {
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+ filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+ filterHolders.add(filterHolder);
+ }
+
+ if (config.isHttpRequestsLimitEnabled()) {
+ filterHolders.add(new FilterHolder(
+ new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ }
+
+ if (config.isAuthenticationEnabled()) {
+ authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService));
+ filterHolders.add(authenticationFilterHolder);
+ } else {
+ authenticationFilterHolder = null;
+ }
+ }
+
+ public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+ for (FilterHolder filterHolder : filterHolders) {
+ if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+ context.addFilter(filterHolder,
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+ }
+ }
+ }
+ }
+
public void addServlet(String basePath, ServletHolder servletHolder) {
addServlet(basePath, servletHolder, Collections.emptyList());
}
@@ -170,29 +209,14 @@ public class WebServer {
context.setAttribute(attribute.getLeft(), attribute.getRight());
}
- addQosFilterIfNeeded(context);
-
- if (config.isHttpRequestsLimitEnabled()) {
- addFilter(context, new RateLimitingFilter(config.getHttpRequestsMaxPerSecond()));
- }
-
- if (config.isAuthenticationEnabled() && requireAuthentication) {
- addFilter(context, new AuthenticationFilter(authenticationService));
- }
+ filterInitializer.addFilters(context, requireAuthentication);
handlers.add(context);
}
- private void addQosFilterIfNeeded(ServletContextHandler context) {
- if (config.getMaxConcurrentHttpRequests() > 0) {
- addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
- String.valueOf(config.getMaxConcurrentHttpRequests())));
- }
- }
-
- public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) {
+ public void addRestResource(String basePath, String attribute, Object attributeValue, Class<?> resourceClass) {
ResourceConfig config = new ResourceConfig();
- config.packages("jersey.config.server.provider.packages", javaPackages);
+ config.register(resourceClass);
config.register(JsonMapperProvider.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 972655fe9d4..03052a28553 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -87,8 +87,8 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
- webServer.addRestResources("/", VipStatus.class.getPackage().getName(),
- VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath());
+ webServer.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath(),
+ VipStatus.class);
// start web-service
webServer.start();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 9ab73b4db06..7c0ad6d813a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -18,14 +18,14 @@
*/
package org.apache.pulsar.websocket.service;
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import javax.servlet.DispatcherType;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
@@ -43,6 +43,7 @@ import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.QoSFilter;
@@ -58,6 +59,7 @@ public class ProxyServer {
private final List<Handler> handlers = new ArrayList<>();
private final WebSocketProxyConfiguration conf;
private final WebExecutorThreadPool executorService;
+ private final FilterHolder qualityOfServiceFilterHolder;
private ServerConnector connector;
private ServerConnector connectorTls;
@@ -121,6 +123,14 @@ public class ProxyServer {
// file descriptors
connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+ if (config.getMaxConcurrentHttpRequests() > 0) {
+ qualityOfServiceFilterHolder = new FilterHolder(QoSFilter.class);
+ qualityOfServiceFilterHolder.setInitParameter("maxRequests",
+ String.valueOf(config.getMaxConcurrentHttpRequests()));
+ } else {
+ qualityOfServiceFilterHolder = null;
+ }
}
public void addWebSocketServlet(String basePath, Servlet socketServlet)
@@ -133,9 +143,9 @@ public class ProxyServer {
handlers.add(context);
}
- public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) {
+ public void addRestResource(String basePath, String attribute, Object attributeValue, Class<?> resourceClass) {
ResourceConfig config = new ResourceConfig();
- config.packages("jersey.config.server.provider.packages", javaPackages);
+ config.register(resourceClass);
config.register(JsonMapperProvider.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
@@ -148,9 +158,9 @@ public class ProxyServer {
}
private void addQosFilterIfNeeded(ServletContextHandler context) {
- if (conf.getMaxConcurrentHttpRequests() > 0) {
- addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
- String.valueOf(conf.getMaxConcurrentHttpRequests())));
+ if (qualityOfServiceFilterHolder != null) {
+ context.addFilter(qualityOfServiceFilterHolder,
+ MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
index 63aff925ddc..f2a0baa31c5 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
@@ -101,12 +101,10 @@ public class WebSocketServiceStarter {
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new WebSocketPingPongServlet(service));
- proxyServer.addRestResources(ADMIN_PATH_V1, WebSocketProxyStatsV1.class.getPackage().getName(),
- ATTRIBUTE_PROXY_SERVICE_NAME, service);
- proxyServer.addRestResources(ADMIN_PATH_V2, WebSocketProxyStatsV2.class.getPackage().getName(),
- ATTRIBUTE_PROXY_SERVICE_NAME, service);
- proxyServer.addRestResources("/", VipStatus.class.getPackage().getName(),
- VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath());
+ proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
+ proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
+ proxyServer.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath(),
+ VipStatus.class);
proxyServer.start();
service.start();
}