You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/18 03:12:27 UTC

[pulsar] branch branch-2.8 updated: [feature][pulsar-broker]: Add additional servlet support to broker (#11498)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 5c3ddc2  [feature][pulsar-broker]: Add additional servlet support to broker (#11498)
5c3ddc2 is described below

commit 5c3ddc26d6e07eb0473b11b5ecc8318c1efe414b
Author: Kai Wang <wa...@gmail.com>
AuthorDate: Tue Aug 3 00:25:22 2021 +0800

    [feature][pulsar-broker]: Add additional servlet support to broker (#11498)
    
    Currently the broker does not support additional servlet, but some plugins need to add custom web routes on the broker.
    
    Add additional servlet support to broker
    
    (cherry picked from commit 0a680695fb6b709f06faa4b780eddc4ba2e50784)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  14 ++
 .../broker/web/plugin/servlet/package-info.java    |   2 +-
 .../org/apache/pulsar/broker/PulsarService.java    | 185 ++++++++++++-------
 .../AdditionalServletWithPulsarService.java        |   9 +-
 .../broker/web/plugin/servlet/package-info.java    |   3 +-
 .../pulsar/broker/BrokerAdditionalServletTest.java | 200 +++++++++++++++++++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   5 +
 .../AdditionalServletWithPulsarServiceTest.java    |  85 +++++++++
 .../MockAdditionalServletWithClassLoader.java      |  35 +++-
 9 files changed, 466 insertions(+), 72 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d8fcbd9..a223b58 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -93,6 +93,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private static final String CATEGORY_TRANSACTION = "Transaction";
     @Category
     private static final String CATEGORY_PACKAGES_MANAGEMENT = "Packages Management";
+    @Category
+    private static final String CATEGORY_PLUGIN = "Broker Plugin";
 
     /***** --- pulsar configuration --- ****/
     @FieldContext(
@@ -2230,6 +2232,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
 
     /* packages management service configurations (end) */
 
+    @FieldContext(
+            category = CATEGORY_PLUGIN,
+            doc = "The directory to locate broker additional servlet"
+    )
+    private String additionalServletDirectory = "./brokerAdditionalServlet";
+
+    @FieldContext(
+            category = CATEGORY_PLUGIN,
+            doc = "List of broker additional servlet to load, which is a list of broker additional servlet names"
+    )
+    private Set<String> additionalServlets = Sets.newTreeSet();
+
     /**
      * @deprecated See {@link #getConfigurationStoreServers}
      */
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
index 2807dc4..8c3b3b2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
@@ -18,6 +18,6 @@
  */
 
 /**
- * Pulsar proxy servlet plugin.
+ * Pulsar additional servlet plugin.
  */
 package org.apache.pulsar.broker.web.plugin.servlet;
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 89a071b..f7a1337 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -35,8 +35,10 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,6 +61,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import javax.servlet.ServletException;
+import javax.websocket.DeploymentException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
@@ -113,10 +117,15 @@ import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStor
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -238,6 +247,7 @@ public class PulsarService implements AutoCloseable {
     private HashedWheelTimer transactionTimer;
 
     private BrokerInterceptor brokerInterceptor;
+    private AdditionalServlets brokerAdditionalServlets;
 
     // packages management service
     private PackagesManagement packagesManagement;
@@ -376,6 +386,11 @@ public class PulsarService implements AutoCloseable {
                 this.webSocketService.close();
             }
 
+            if (brokerAdditionalServlets != null) {
+                brokerAdditionalServlets.close();
+                brokerAdditionalServlets = null;
+            }
+
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
@@ -656,30 +671,11 @@ public class PulsarService implements AutoCloseable {
             this.brokerInterceptor.initialize(this);
             brokerService.start();
 
+            // Load additional servlets
+            this.brokerAdditionalServlets = AdditionalServlets.load(config);
+
             this.webService = new WebService(this);
-            Map<String, Object> attributeMap = Maps.newHashMap();
-            attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
-            Map<String, Object> vipAttributeMap = Maps.newHashMap();
-            vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath());
-            vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() {
-                @Override
-                public Boolean get() {
-                    // Ensure the VIP status is only visible when the broker is fully initialized
-                    return state == State.Started;
-                }
-            });
-            this.webService.addRestResources("/",
-                    VipStatus.class.getPackage().getName(), false, vipAttributeMap);
-            this.webService.addRestResources("/",
-                    "org.apache.pulsar.broker.web", false, attributeMap);
-            this.webService.addRestResources("/admin",
-                    "org.apache.pulsar.broker.admin.v1", true, attributeMap);
-            this.webService.addRestResources("/admin/v2",
-                    "org.apache.pulsar.broker.admin.v2", true, attributeMap);
-            this.webService.addRestResources("/admin/v3",
-                    "org.apache.pulsar.broker.admin.v3", true, attributeMap);
-            this.webService.addRestResources("/lookup",
-                    "org.apache.pulsar.broker.lookup", true, attributeMap);
+
             this.metricsServlet = new PrometheusMetricsServlet(
                     this, config.isExposeTopicLevelMetricsInPrometheus(),
                     config.isExposeConsumerLevelMetricsInPrometheus(),
@@ -689,46 +685,8 @@ public class PulsarService implements AutoCloseable {
                 this.pendingMetricsProviders = null;
             }
 
-            this.webService.addServlet("/metrics",
-                    new ServletHolder(metricsServlet),
-                    false, attributeMap);
-
-            if (config.isWebSocketServiceEnabled()) {
-                // Use local broker address to avoid different IP address when using a VIP for service discovery
-                this.webSocketService = new WebSocketService(null, config);
-                this.webSocketService.start();
-
-                final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
-                this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
-                        new ServletHolder(producerWebSocketServlet), true, attributeMap);
-                this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
-                        new ServletHolder(producerWebSocketServlet), true, attributeMap);
-
-                final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
-                this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
-                        new ServletHolder(consumerWebSocketServlet), true, attributeMap);
-                this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
-                        new ServletHolder(consumerWebSocketServlet), true, attributeMap);
-
-                final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
-                this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH,
-                        new ServletHolder(readerWebSocketServlet), true, attributeMap);
-                this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
-                        new ServletHolder(readerWebSocketServlet), true, attributeMap);
-
-                final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
-                this.webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
-                        new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
-                this.webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
-                        new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
-            }
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Attempting to add static directory");
-            }
-            this.webService.addStaticResources("/static", "/static");
-
-            webService.start();
+            this.addWebServerHandlers(webService, metricsServlet, this.config);
+            this.webService.start();
 
             // Refresh addresses, since the port might have been dynamically assigned
             this.webServiceAddress = webAddress(config);
@@ -832,6 +790,107 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
+    private void addWebServerHandlers(WebService webService,
+                                      PrometheusMetricsServlet metricsServlet,
+                                      ServiceConfiguration config)
+            throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
+            DeploymentException {
+        Map<String, Object> attributeMap = Maps.newHashMap();
+        attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this);
+
+        Map<String, Object> vipAttributeMap = Maps.newHashMap();
+        vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
+        vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, (Supplier<Boolean>) () -> {
+            // Ensure the VIP status is only visible when the broker is fully initialized
+            return state == State.Started;
+        });
+        // Add admin rest resources
+        webService.addRestResources("/",
+                VipStatus.class.getPackage().getName(), false, vipAttributeMap);
+        webService.addRestResources("/",
+                "org.apache.pulsar.broker.web", false, attributeMap);
+        webService.addRestResources("/admin",
+                "org.apache.pulsar.broker.admin.v1", true, attributeMap);
+        webService.addRestResources("/admin/v2",
+                "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+        webService.addRestResources("/admin/v3",
+                "org.apache.pulsar.broker.admin.v3", true, attributeMap);
+        webService.addRestResources("/lookup",
+                "org.apache.pulsar.broker.lookup", true, attributeMap);
+
+        // Add metrics servlet
+        webService.addServlet("/metrics",
+                new ServletHolder(metricsServlet),
+                false, attributeMap);
+
+        // Add websocket service
+        addWebSocketServiceHandler(webService, attributeMap, config);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Attempting to add static directory");
+        }
+        // Add static resources
+        webService.addStaticResources("/static", "/static");
+
+        // Add broker additional servlets
+        addBrokerAdditionalServlets(webService, attributeMap, config);
+    }
+
+    private void addBrokerAdditionalServlets(WebService webService,
+                                             Map<String, Object> attributeMap,
+                                             ServiceConfiguration config) {
+        if (this.getBrokerAdditionalServlets() != null) {
+            Collection<AdditionalServletWithClassLoader> additionalServletCollection =
+                    this.getBrokerAdditionalServlets().getServlets().values();
+            for (AdditionalServletWithClassLoader servletWithClassLoader : additionalServletCollection) {
+                servletWithClassLoader.loadConfig(config);
+                AdditionalServlet additionalServlet = servletWithClassLoader.getServlet();
+                if (additionalServlet instanceof AdditionalServletWithPulsarService) {
+                    ((AdditionalServletWithPulsarService) additionalServlet).setPulsarService(this);
+                }
+                webService.addServlet(servletWithClassLoader.getBasePath(), servletWithClassLoader.getServletHolder(),
+                        config.isAuthenticationEnabled(), attributeMap);
+                LOG.info("Broker add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
+            }
+        }
+    }
+
+    private void addWebSocketServiceHandler(WebService webService,
+                                            Map<String, Object> attributeMap,
+                                            ServiceConfiguration config)
+            throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
+            DeploymentException {
+        if (config.isWebSocketServiceEnabled()) {
+            // Use local broker address to avoid different IP address when using a VIP for service discovery
+            this.webSocketService = new WebSocketService(null, config);
+            this.webSocketService.start();
+
+            final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
+            webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
+                    new ServletHolder(producerWebSocketServlet), true, attributeMap);
+            webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
+                    new ServletHolder(producerWebSocketServlet), true, attributeMap);
+
+            final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
+            webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
+                    new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+            webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
+                    new ServletHolder(consumerWebSocketServlet), true, attributeMap);
+
+            final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
+            webService.addServlet(WebSocketReaderServlet.SERVLET_PATH,
+                    new ServletHolder(readerWebSocketServlet), true, attributeMap);
+            webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
+                    new ServletHolder(readerWebSocketServlet), true, attributeMap);
+
+            final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
+            webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
+                    new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
+            webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
+                    new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
+        }
+    }
+
     private void handleDeleteCluster(Notification notification) {
         if (notification.getPath().startsWith(ClusterResources.CLUSTERS_ROOT)
                 && notification.getType() == NotificationType.Deleted) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarService.java
similarity index 70%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarService.java
index 2807dc4..23037dd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarService.java
@@ -16,8 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.broker.web.plugin.servlet;
+
+import org.apache.pulsar.broker.PulsarService;
 
 /**
- * Pulsar proxy servlet plugin.
+ * The additional servlet with pulsarService interface for support additional servlet to get pulsarService.
  */
-package org.apache.pulsar.broker.web.plugin.servlet;
\ No newline at end of file
+public interface AdditionalServletWithPulsarService extends AdditionalServlet {
+    void setPulsarService(PulsarService pulsarService);
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
similarity index 90%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
index 2807dc4..7da276a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 /**
- * Pulsar proxy servlet plugin.
+ * Pulsar broker servlet plugin.
  */
 package org.apache.pulsar.broker.web.plugin.servlet;
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerAdditionalServletTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerAdditionalServletTest.java
new file mode 100644
index 0000000..4b4c8db
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerAdditionalServletTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.servlet.Servlet;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import okhttp3.Response;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class BrokerAdditionalServletTest extends MockedPulsarServiceBaseTest {
+
+    private final String BASE_PATH = "/additional/servlet";
+    private final String WITH_PULSAR_SERVICE_BASE_PATH = "/additional/servlet/with/pulsar/service";
+    private final String QUERY_PARAM = "param";
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Override
+    protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+        mockAdditionalServlet(pulsar);
+    }
+
+    private void mockAdditionalServlet(PulsarService pulsar) {
+        Servlet servlet = new OrdinaryServlet();
+
+        AdditionalServlet brokerAdditionalServlet = Mockito.mock(AdditionalServlet.class);
+        Mockito.when(brokerAdditionalServlet.getBasePath()).thenReturn(BASE_PATH);
+        Mockito.when(brokerAdditionalServlet.getServletHolder()).thenReturn(new ServletHolder(servlet));
+
+        AdditionalServletWithPulsarService brokerAdditionalServletWithPulsarService =
+                new AdditionalServletWithPulsarService() {
+                    private PulsarService pulsarService;
+                    @Override
+                    public void setPulsarService(PulsarService pulsarService) {
+                        this.pulsarService = pulsarService;
+                    }
+
+                    @Override
+                    public void loadConfig(PulsarConfiguration pulsarConfiguration) {
+                        // No-op
+                    }
+
+                    @Override
+                    public String getBasePath() {
+                        return WITH_PULSAR_SERVICE_BASE_PATH;
+                    }
+
+                    @Override
+                    public ServletHolder getServletHolder() {
+                        return new ServletHolder(new WithPulsarServiceServlet(pulsarService));
+                    }
+
+                    @Override
+                    public void close() {
+                        // No-op
+                    }
+                };
+
+
+        AdditionalServlets brokerAdditionalServlets = Mockito.mock(AdditionalServlets.class);
+        Map<String, AdditionalServletWithClassLoader> map = new HashMap<>();
+        map.put("broker-additional-servlet", new AdditionalServletWithClassLoader(brokerAdditionalServlet, null));
+        map.put("broker-additional-servlet-with-pulsar-service", new AdditionalServletWithClassLoader(brokerAdditionalServletWithPulsarService, null));
+        Mockito.when(brokerAdditionalServlets.getServlets()).thenReturn(map);
+
+        Mockito.when(pulsar.getBrokerAdditionalServlets()).thenReturn(brokerAdditionalServlets);
+    }
+
+    @Test
+    public void test() throws IOException {
+        int httpPort = pulsar.getWebService().getListenPortHTTP().get();
+        log.info("pulsar webService httpPort {}", httpPort);
+        String paramValue = "value - " + RandomUtils.nextInt();
+        String response = httpGet("http://localhost:" + httpPort + BASE_PATH + "?" + QUERY_PARAM + "=" + paramValue);
+        Assert.assertEquals(response, paramValue);
+
+        String WithPulsarServiceParamValue = PulsarService.class.getName();
+        String withPulsarServiceResponse = httpGet("http://localhost:" + httpPort + WITH_PULSAR_SERVICE_BASE_PATH);
+        Assert.assertEquals(WithPulsarServiceParamValue, withPulsarServiceResponse);
+    }
+
+
+    private class OrdinaryServlet implements Servlet {
+        @Override
+        public void init(ServletConfig servletConfig) throws ServletException {
+            log.info("[init]");
+        }
+
+        @Override
+        public ServletConfig getServletConfig() {
+            log.info("[getServletConfig]");
+            return null;
+        }
+
+        @Override
+        public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException,
+                IOException {
+            log.info("[service] path: {}", ((Request) servletRequest).getOriginalURI());
+            String value = servletRequest.getParameterMap().get(QUERY_PARAM)[0];
+            ServletOutputStream servletOutputStream = servletResponse.getOutputStream();
+            servletResponse.setContentLength(value.getBytes().length);
+            servletOutputStream.write(value.getBytes());
+            servletOutputStream.flush();
+        }
+
+        @Override
+        public String getServletInfo() {
+            log.info("[getServletInfo]");
+            return null;
+        }
+
+        @Override
+        public void destroy() {
+            log.info("[destroy]");
+        }
+    }
+
+
+    private class WithPulsarServiceServlet extends OrdinaryServlet {
+        private final PulsarService pulsarService;
+
+        public WithPulsarServiceServlet(PulsarService pulsar) {
+            this.pulsarService = pulsar;
+        }
+
+        @Override
+        public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException,
+                IOException {
+            log.info("[service] path: {}", ((Request) servletRequest).getOriginalURI());
+            String value = pulsarService == null ? "null" : PulsarService.class.getName();
+            ServletOutputStream servletOutputStream = servletResponse.getOutputStream();
+            servletResponse.setContentLength(value.getBytes().length);
+            servletOutputStream.write(value.getBytes());
+            servletOutputStream.flush();
+        }
+    }
+
+    String httpGet(String url) throws IOException {
+        OkHttpClient client = new OkHttpClient();
+        okhttp3.Request request = new okhttp3.Request.Builder()
+                .get()
+                .url(url)
+                .build();
+
+        try (Response response = client.newCall(request).execute()) {
+            return response.body().string();
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index aa10db0..ddb76e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -238,6 +238,10 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
 
     protected abstract void cleanup() throws Exception;
 
+    protected void beforePulsarStartMocks(PulsarService pulsar) throws Exception {
+        // No-op
+    }
+
     protected void restartBroker() throws Exception {
         stopBroker();
         startBroker();
@@ -284,6 +288,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         conf.setBrokerShutdownTimeoutMs(0L);
         PulsarService pulsar = spy(new PulsarService(conf));
         setupBrokerMocks(pulsar);
+        beforePulsarStartMocks(pulsar);
         pulsar.start();
         log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
                 pulsar.getWebServiceAddress());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarServiceTest.java
new file mode 100644
index 0000000..479c943
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletWithPulsarServiceTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web.plugin.servlet;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertSame;
+import static org.testng.AssertJUnit.assertTrue;
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Set;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({
+        NarClassLoader.class
+})
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class AdditionalServletWithPulsarServiceTest {
+
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testLoadAdditionalServlet() throws Exception {
+        AdditionalServletDefinition def = new AdditionalServletDefinition();
+        def.setAdditionalServletClass(MockAdditionalServletWithClassLoader.class.getName());
+        def.setDescription("test-additional-servlet");
+
+        String archivePath = "/path/to/additional/servlet/nar";
+
+        AdditionalServletMetadata metadata = new AdditionalServletMetadata();
+        metadata.setDefinition(def);
+        metadata.setArchivePath(Paths.get(archivePath));
+
+        NarClassLoader mockLoader = mock(NarClassLoader.class);
+        when(mockLoader.getServiceDefinition(eq(AdditionalServletUtils.ADDITIONAL_SERVLET_FILE)))
+                .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def));
+        Class additionalServletClass = MockAdditionalServletWithClassLoader.class;
+        when(mockLoader.loadClass(eq(MockAdditionalServletWithClassLoader.class.getName())))
+                .thenReturn(additionalServletClass);
+
+        PowerMockito.mockStatic(NarClassLoader.class);
+        PowerMockito.when(NarClassLoader.getFromArchive(
+                any(File.class),
+                any(Set.class),
+                any(ClassLoader.class),
+                any(String.class)
+        )).thenReturn(mockLoader);
+
+        AdditionalServletWithClassLoader returnedASWithCL = AdditionalServletUtils.load(metadata, "");
+        AdditionalServlet returnedPh = returnedASWithCL.getServlet();
+
+        assertSame(mockLoader, returnedASWithCL.getClassLoader());
+        assertTrue(returnedPh instanceof MockAdditionalServletWithClassLoader);
+    }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/MockAdditionalServletWithClassLoader.java
similarity index 51%
copy from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
copy to pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/MockAdditionalServletWithClassLoader.java
index 2807dc4..6ff7b85 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/package-info.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/MockAdditionalServletWithClassLoader.java
@@ -16,8 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.broker.web.plugin.servlet;
 
-/**
- * Pulsar proxy servlet plugin.
- */
-package org.apache.pulsar.broker.web.plugin.servlet;
\ No newline at end of file
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+public class MockAdditionalServletWithClassLoader implements AdditionalServletWithPulsarService{
+    @Override
+    public void loadConfig(PulsarConfiguration pulsarConfiguration) {
+        // No-op
+    }
+
+    @Override
+    public String getBasePath() {
+        return null;
+    }
+
+    @Override
+    public ServletHolder getServletHolder() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+        // No-op
+    }
+
+    @Override
+    public void setPulsarService(PulsarService pulsarService) {
+        // No-op
+    }
+}