You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 07:59:33 UTC

[pulsar] branch master updated: Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 94392a41fc8 Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)
94392a41fc8 is described below

commit 94392a41fc8a6f7b431785bdc9701250b43c8464
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Jun 6 10:59:25 2022 +0300

    Remove duplicate filter instances in Broker, Proxy and Function worker web server (#15637)
    
    - There were filter instances for each context path which made
      maxConcurrentHttpRequests and httpRequestsMaxPerSecond not work as expected.
    - Fixes the backpressure solution that is dependent on maxConcurrentHttpRequests and httpRequestsMaxPerSecond
      working properly
    - Fix invalid Jersey api usage
---
 .../java/org/apache/pulsar/broker/web/Filters.java |  60 ----------
 .../org/apache/pulsar/broker/PulsarService.java    |  24 ++--
 .../org/apache/pulsar/broker/web/WebService.java   | 127 ++++++++++++++-------
 .../pulsar/functions/worker/rest/WorkerServer.java |  79 ++++++++-----
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   6 +-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  64 +++++++----
 .../server/UnauthedAdminProxyHandlerTest.java      |   4 +-
 .../pulsar/websocket/service/ProxyServer.java      |  24 ++--
 .../websocket/service/WebSocketServiceStarter.java |  10 +-
 9 files changed, 216 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
deleted file mode 100644
index 3b6bb721bcc..00000000000
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.web;
-
-import java.util.EnumSet;
-import java.util.Map;
-import javax.servlet.DispatcherType;
-import javax.servlet.Filter;
-import org.eclipse.jetty.servlet.FilterHolder;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-
-public class Filters {
-    private static final String MATCH_ALL = "/*";
-
-    /**
-     * Adds a filter instance to the servlet context handler.
-     * The filter will be used for all requests.
-     *
-     * @param context servlet context handler instance
-     * @param filter filter instance
-     */
-    public static void addFilter(ServletContextHandler context, Filter filter) {
-        addFilterHolder(context, new FilterHolder(filter));
-    }
-
-    private static void addFilterHolder(ServletContextHandler context, FilterHolder filter) {
-        context.addFilter(filter,
-                MATCH_ALL, EnumSet.allOf(DispatcherType.class));
-    }
-
-    /**
-     * Adds a filter to the servlet context handler which gets instantiated and configured when the server starts.
-     *
-     * @param context servlet context handler instance
-     * @param filter filter class
-     * @param initParams initialization parameters used for configuring the filter instance
-     */
-    public static void addFilterClass(ServletContextHandler context, Class<? extends Filter> filter,
-                                      Map<String, String> initParams) {
-        FilterHolder holder = new FilterHolder(filter);
-        holder.setInitParameters(initParams);
-        addFilterHolder(context, holder);
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9cc2878df0a..22368aab521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.lookup.v1.TopicLookup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.protocol.ProtocolHandlers;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
@@ -98,6 +99,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager
 import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
@@ -882,21 +884,21 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             // Ensure the VIP status is only visible when the broker is fully initialized
             return state == State.Started;
         });
+
         // Add admin rest resources
-        webService.addRestResources("/",
-                VipStatus.class.getPackage().getName(), false, vipAttributeMap);
-        webService.addRestResources("/",
-                "org.apache.pulsar.broker.web", false, attributeMap);
+        webService.addRestResource("/",
+                false, vipAttributeMap, VipStatus.class);
         webService.addRestResources("/admin",
-                "org.apache.pulsar.broker.admin.v1", true, attributeMap);
+                true, attributeMap, "org.apache.pulsar.broker.admin.v1");
         webService.addRestResources("/admin/v2",
-                "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+                true, attributeMap, "org.apache.pulsar.broker.admin.v2");
         webService.addRestResources("/admin/v3",
-                "org.apache.pulsar.broker.admin.v3", true, attributeMap);
-        webService.addRestResources("/lookup",
-                "org.apache.pulsar.broker.lookup", true, attributeMap);
-        webService.addRestResources("/topics",
-                            "org.apache.pulsar.broker.rest", true, attributeMap);
+                true, attributeMap, "org.apache.pulsar.broker.admin.v3");
+        webService.addRestResource("/lookup",
+                true, attributeMap, TopicLookup.class,
+                org.apache.pulsar.broker.lookup.v2.TopicLookup.class);
+        webService.addRestResource("/topics",
+                true, attributeMap, Topics.class);
 
         // Add metrics servlet
         webService.addServlet("/metrics",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 8c965b8e496..0117ae31f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -18,16 +18,15 @@
  */
 package org.apache.pulsar.broker.web;
 
-import static org.apache.pulsar.broker.web.Filters.addFilter;
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import com.google.common.collect.Lists;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import javax.servlet.DispatcherType;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -43,6 +42,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.QoSFilter;
@@ -72,6 +72,7 @@ public class WebService implements AutoCloseable {
 
     private final ServerConnector httpConnector;
     private final ServerConnector httpsConnector;
+    private final FilterInitializer filterInitializer;
     private JettyStatisticsCollector jettyStatisticsCollector;
 
     public WebService(PulsarService pulsar) throws PulsarServerException {
@@ -144,12 +145,30 @@ public class WebService implements AutoCloseable {
         // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
         connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+        filterInitializer = new FilterInitializer(pulsar);
+    }
+
+    public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+                                 String... javaPackages) {
+        ResourceConfig config = new ResourceConfig();
+        for (String javaPackage : javaPackages) {
+            config.packages(false, javaPackage);
+        }
+        addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
     }
 
-    public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication,
-                                 Map<String, Object> attributeMap) {
+    public void addRestResource(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+                                Class<?>... resourceClasses) {
         ResourceConfig config = new ResourceConfig();
-        config.packages("jersey.config.server.provider.packages", javaPackages);
+        for (Class<?> resourceClass : resourceClasses) {
+            config.register(resourceClass);
+        }
+        addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
+    }
+
+    private void addResourceServlet(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
+                                    ResourceConfig config) {
         config.register(JsonMapperProvider.class);
         config.register(MultiPartFeature.class);
         ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
@@ -157,53 +176,75 @@ public class WebService implements AutoCloseable {
         addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
     }
 
-    public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
-                           Map<String, Object> attributeMap) {
-        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath(path);
-        context.addServlet(servletHolder, MATCH_ALL);
-        if (attributeMap != null) {
-            attributeMap.forEach((key, value) -> {
-                context.setAttribute(key, value);
-            });
-        }
+    private static class FilterInitializer {
+        private final List<FilterHolder> filterHolders = new ArrayList<>();
+        private final FilterHolder authenticationFilterHolder;
+        FilterInitializer(PulsarService pulsarService) {
+            ServiceConfiguration config = pulsarService.getConfiguration();
+            if (config.getMaxConcurrentHttpRequests() > 0) {
+                FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+                filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+                filterHolders.add(filterHolder);
+            }
 
-        ServiceConfiguration config = pulsar.getConfig();
+            if (config.isHttpRequestsLimitEnabled()) {
+                filterHolders.add(new FilterHolder(
+                        new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+            }
 
-        if (config.getMaxConcurrentHttpRequests() > 0) {
-            addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
-                    String.valueOf(config.getMaxConcurrentHttpRequests())));
-        }
+            if (!config.getBrokerInterceptors().isEmpty()
+                    || !config.isDisableBrokerInterceptors()) {
+                ExceptionHandler handler = new ExceptionHandler();
+                // Enable PreInterceptFilter only when interceptors are enabled
+                filterHolders.add(
+                        new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
+                filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService)));
+            }
 
-        if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
-            addFilter(context,
-                    new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
-        }
+            if (config.isAuthenticationEnabled()) {
+                authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(
+                        pulsarService.getBrokerService().getAuthenticationService()));
+                filterHolders.add(authenticationFilterHolder);
+            } else {
+                authenticationFilterHolder = null;
+            }
 
-        if (!config.getBrokerInterceptors().isEmpty()
-                || !config.isDisableBrokerInterceptors()) {
-            ExceptionHandler handler = new ExceptionHandler();
-            // Enable PreInterceptFilter only when interceptors are enabled
-            addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
-            addFilter(context, new ProcessHandlerFilter(pulsar));
-        }
+            if (config.isDisableHttpDebugMethods()) {
+                filterHolders.add(new FilterHolder(new DisableDebugHttpMethodFilter(config)));
+            }
 
-        if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
-            addFilter(context, new AuthenticationFilter(
-                    pulsar.getBrokerService().getAuthenticationService()));
-        }
+            if (config.getHttpMaxRequestSize() > 0) {
+                filterHolders.add(new FilterHolder(
+                        new MaxRequestSizeFilter(
+                                config.getHttpMaxRequestSize())));
+            }
 
-        if (config.isDisableHttpDebugMethods()) {
-            addFilter(context, new DisableDebugHttpMethodFilter(config));
+            filterHolders.add(new FilterHolder(new ResponseHandlerFilter(pulsarService)));
         }
 
-        if (config.getHttpMaxRequestSize() > 0) {
-            addFilter(context,
-                    new MaxRequestSizeFilter(
-                            config.getHttpMaxRequestSize()));
+        public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+            for (FilterHolder filterHolder : filterHolders) {
+                if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+                    context.addFilter(filterHolder,
+                            MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+                }
+            }
         }
 
-        addFilter(context, new ResponseHandlerFilter(pulsar));
+    }
+
+    public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
+                           Map<String, Object> attributeMap) {
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        // Notice: each context path should be unique, but there's nothing here to verify that
+        context.setContextPath(path);
+        context.addServlet(servletHolder, MATCH_ALL);
+        if (attributeMap != null) {
+            attributeMap.forEach((key, value) -> {
+                context.setAttribute(key, value);
+            });
+        }
+        filterInitializer.addFilters(context, requiresAuthentication);
         handlers.add(context);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a2cc2c9b8cc..e913db0ae79 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,16 +18,15 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
-import static org.apache.pulsar.broker.web.Filters.addFilter;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import javax.servlet.DispatcherType;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
-import org.apache.pulsar.broker.web.Filters;
 import org.apache.pulsar.broker.web.JettyRequestLogFactory;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -45,6 +44,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.QoSFilter;
@@ -65,12 +65,15 @@ public class WorkerServer {
     private ServerConnector httpConnector;
     private ServerConnector httpsConnector;
 
+    private final FilterInitializer filterInitializer;
+
     public WorkerServer(WorkerService workerService, AuthenticationService authenticationService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
         this.authenticationService = authenticationService;
         this.webServerExecutor = new WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), "function-web",
                 this.workerConfig.getHttpServerThreadPoolQueueSize());
+        this.filterInitializer = new FilterInitializer(workerConfig, authenticationService);
         init();
     }
 
@@ -95,15 +98,15 @@ public class WorkerServer {
 
         List<Handler> handlers = new ArrayList<>(4);
         handlers.add(newServletContextHandler("/admin",
-            new ResourceConfig(Resources.getApiV2Resources()), workerService, authenticationService));
+            new ResourceConfig(Resources.getApiV2Resources()), workerService, filterInitializer));
         handlers.add(newServletContextHandler("/admin/v2",
-            new ResourceConfig(Resources.getApiV2Resources()), workerService, authenticationService));
+            new ResourceConfig(Resources.getApiV2Resources()), workerService, filterInitializer));
         handlers.add(newServletContextHandler("/admin/v3",
-            new ResourceConfig(Resources.getApiV3Resources()), workerService, authenticationService));
+            new ResourceConfig(Resources.getApiV3Resources()), workerService, filterInitializer));
         // don't require auth for metrics or config routes
         handlers.add(newServletContextHandler("/",
             new ResourceConfig(Resources.getRootResources()), workerService,
-            workerConfig.isAuthenticateMetricsEndpoint(), authenticationService));
+            workerConfig.isAuthenticateMetricsEndpoint(), filterInitializer));
 
         RequestLogHandler requestLogHandler = new RequestLogHandler();
         requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
@@ -171,18 +174,52 @@ public class WorkerServer {
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
     }
 
-    public static ServletContextHandler newServletContextHandler(String contextPath,
+    private static class FilterInitializer {
+        private final List<FilterHolder> filterHolders = new ArrayList<>();
+        private final FilterHolder authenticationFilterHolder;
+
+        FilterInitializer(WorkerConfig config, AuthenticationService authenticationService) {
+            if (config.getMaxConcurrentHttpRequests() > 0) {
+                FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+                filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+                filterHolders.add(filterHolder);
+            }
+
+            if (config.isHttpRequestsLimitEnabled()) {
+                filterHolders.add(new FilterHolder(
+                        new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+            }
+
+            if (config.isAuthenticationEnabled()) {
+                authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService));
+                filterHolders.add(authenticationFilterHolder);
+            } else {
+                authenticationFilterHolder = null;
+            }
+        }
+
+        public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+            for (FilterHolder filterHolder : filterHolders) {
+                if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+                    context.addFilter(filterHolder,
+                            MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+                }
+            }
+        }
+    }
+
+    static ServletContextHandler newServletContextHandler(String contextPath,
                                                                  ResourceConfig config,
                                                                  WorkerService workerService,
-                                                                 AuthenticationService authenticationService) {
-        return newServletContextHandler(contextPath, config, workerService, true, authenticationService);
+                                                                 FilterInitializer filterInitializer) {
+        return newServletContextHandler(contextPath, config, workerService, true, filterInitializer);
     }
 
-    public static ServletContextHandler newServletContextHandler(String contextPath,
+    static ServletContextHandler newServletContextHandler(String contextPath,
                                                                  ResourceConfig config,
                                                                  WorkerService workerService,
                                                                  boolean requireAuthentication,
-                                                                 AuthenticationService authenticationService) {
+                                                                 FilterInitializer filterInitializer) {
         final ServletContextHandler contextHandler =
                 new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
 
@@ -195,27 +232,11 @@ public class WorkerServer {
                 new ServletHolder(new ServletContainer(config));
         contextHandler.addServlet(apiServlet, MATCH_ALL);
 
-        addQosFilterIfNeeded(contextHandler, workerService.getWorkerConfig());
-
-        if (workerService.getWorkerConfig().isHttpRequestsLimitEnabled()) {
-            addFilter(contextHandler,
-                    new RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond()));
-        }
-
-        if (workerService.getWorkerConfig().isAuthenticationEnabled() && requireAuthentication) {
-            addFilter(contextHandler, new AuthenticationFilter(authenticationService));
-        }
+        filterInitializer.addFilters(contextHandler, requireAuthentication);
 
         return contextHandler;
     }
 
-    private static void addQosFilterIfNeeded(ServletContextHandler context, WorkerConfig workerConfig) {
-        if (workerConfig.getMaxConcurrentHttpRequests() > 0) {
-            Filters.addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
-                    String.valueOf(workerConfig.getMaxConcurrentHttpRequests())));
-        }
-    }
-
     public void stop() {
         if (this.server != null) {
             try {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 6031850fc02..47ac70cc0cd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -260,10 +260,8 @@ public class ProxyServiceStarter {
                         Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
             }
         }
-        server.addRestResources("/", VipStatus.class.getPackage().getName(),
-                VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
-        server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
-                ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
+        server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class);
+        server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class);
 
         AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index b2c3e88e1b0..193d0c7c93f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -18,16 +18,16 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.apache.pulsar.broker.web.Filters.addFilter;
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import javax.servlet.DispatcherType;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
@@ -48,6 +48,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.QoSFilter;
@@ -76,6 +77,8 @@ public class WebServer {
     private ServerConnector connector;
     private ServerConnector connectorTls;
 
+    private final FilterInitializer filterInitializer;
+
     public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) {
         this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web",
                 config.getHttpServerThreadPoolQueueSize());
@@ -140,12 +143,48 @@ public class WebServer {
         // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
         connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+        filterInitializer = new FilterInitializer(config, authenticationService);
     }
 
     public URI getServiceUri() {
         return serviceURI;
     }
 
+    private static class FilterInitializer {
+        private final List<FilterHolder> filterHolders = new ArrayList<>();
+        private final FilterHolder authenticationFilterHolder;
+
+        FilterInitializer(ProxyConfiguration config, AuthenticationService authenticationService) {
+            if (config.getMaxConcurrentHttpRequests() > 0) {
+                FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
+                filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
+                filterHolders.add(filterHolder);
+            }
+
+            if (config.isHttpRequestsLimitEnabled()) {
+                filterHolders.add(new FilterHolder(
+                        new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+            }
+
+            if (config.isAuthenticationEnabled()) {
+                authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService));
+                filterHolders.add(authenticationFilterHolder);
+            } else {
+                authenticationFilterHolder = null;
+            }
+        }
+
+        public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
+            for (FilterHolder filterHolder : filterHolders) {
+                if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
+                    context.addFilter(filterHolder,
+                            MATCH_ALL, EnumSet.allOf(DispatcherType.class));
+                }
+            }
+        }
+    }
+
     public void addServlet(String basePath, ServletHolder servletHolder) {
         addServlet(basePath, servletHolder, Collections.emptyList());
     }
@@ -170,29 +209,14 @@ public class WebServer {
             context.setAttribute(attribute.getLeft(), attribute.getRight());
         }
 
-        addQosFilterIfNeeded(context);
-
-        if (config.isHttpRequestsLimitEnabled()) {
-            addFilter(context, new RateLimitingFilter(config.getHttpRequestsMaxPerSecond()));
-        }
-
-        if (config.isAuthenticationEnabled() && requireAuthentication) {
-            addFilter(context, new AuthenticationFilter(authenticationService));
-        }
+        filterInitializer.addFilters(context, requireAuthentication);
 
         handlers.add(context);
     }
 
-    private void addQosFilterIfNeeded(ServletContextHandler context) {
-        if (config.getMaxConcurrentHttpRequests() > 0) {
-            addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
-                    String.valueOf(config.getMaxConcurrentHttpRequests())));
-        }
-    }
-
-    public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) {
+    public void addRestResource(String basePath, String attribute, Object attributeValue, Class<?> resourceClass) {
         ResourceConfig config = new ResourceConfig();
-        config.packages("jersey.config.server.provider.packages", javaPackages);
+        config.register(resourceClass);
         config.register(JsonMapperProvider.class);
         ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
         servletHolder.setAsyncSupported(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 972655fe9d4..03052a28553 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -87,8 +87,8 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
-        webServer.addRestResources("/", VipStatus.class.getPackage().getName(),
-                VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath());
+        webServer.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath(),
+                VipStatus.class);
 
         // start web-service
         webServer.start();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
index 9ab73b4db06..7c0ad6d813a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.websocket.service;
 
-import static org.apache.pulsar.broker.web.Filters.addFilterClass;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import javax.servlet.DispatcherType;
 import javax.servlet.Servlet;
 import javax.servlet.ServletException;
 import javax.websocket.DeploymentException;
@@ -43,6 +43,7 @@ import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.QoSFilter;
@@ -58,6 +59,7 @@ public class ProxyServer {
     private final List<Handler> handlers = new ArrayList<>();
     private final WebSocketProxyConfiguration conf;
     private final WebExecutorThreadPool executorService;
+    private final FilterHolder qualityOfServiceFilterHolder;
 
     private ServerConnector connector;
     private ServerConnector connectorTls;
@@ -121,6 +123,14 @@ public class ProxyServer {
         // file descriptors
         connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+
+        if (config.getMaxConcurrentHttpRequests() > 0) {
+            qualityOfServiceFilterHolder = new FilterHolder(QoSFilter.class);
+            qualityOfServiceFilterHolder.setInitParameter("maxRequests",
+                    String.valueOf(config.getMaxConcurrentHttpRequests()));
+        } else {
+            qualityOfServiceFilterHolder = null;
+        }
     }
 
     public void addWebSocketServlet(String basePath, Servlet socketServlet)
@@ -133,9 +143,9 @@ public class ProxyServer {
         handlers.add(context);
     }
 
-    public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) {
+    public void addRestResource(String basePath, String attribute, Object attributeValue, Class<?> resourceClass) {
         ResourceConfig config = new ResourceConfig();
-        config.packages("jersey.config.server.provider.packages", javaPackages);
+        config.register(resourceClass);
         config.register(JsonMapperProvider.class);
         ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
         servletHolder.setAsyncSupported(true);
@@ -148,9 +158,9 @@ public class ProxyServer {
     }
 
     private void addQosFilterIfNeeded(ServletContextHandler context) {
-        if (conf.getMaxConcurrentHttpRequests() > 0) {
-            addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
-                    String.valueOf(conf.getMaxConcurrentHttpRequests())));
+        if (qualityOfServiceFilterHolder != null) {
+            context.addFilter(qualityOfServiceFilterHolder,
+                    MATCH_ALL, EnumSet.allOf(DispatcherType.class));
         }
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
index 63aff925ddc..f2a0baa31c5 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
@@ -101,12 +101,10 @@ public class WebSocketServiceStarter {
         proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
                 new WebSocketPingPongServlet(service));
 
-        proxyServer.addRestResources(ADMIN_PATH_V1, WebSocketProxyStatsV1.class.getPackage().getName(),
-                ATTRIBUTE_PROXY_SERVICE_NAME, service);
-        proxyServer.addRestResources(ADMIN_PATH_V2, WebSocketProxyStatsV2.class.getPackage().getName(),
-                ATTRIBUTE_PROXY_SERVICE_NAME, service);
-        proxyServer.addRestResources("/", VipStatus.class.getPackage().getName(),
-                VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath());
+        proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
+        proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
+        proxyServer.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, service.getConfig().getStatusFilePath(),
+                VipStatus.class);
         proxyServer.start();
         service.start();
     }