You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/29 11:38:39 UTC

[pulsar] branch master updated: Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d3d580f  Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)
d3d580f is described below

commit d3d580f3a23f58f3612cbceeb817d357efefd30d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Nov 29 12:35:46 2021 +0100

    Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)
---
 .../pulsar/broker/protocol/ProtocolHandlers.java   |   7 +
 .../pulsar/broker/service/BrokerService.java       |  11 +-
 ...impleProtocolHandlerSeparateThreadPoolTest.java |  25 +++
 .../SimpleProtocolHandlerSharedThreadPoolTest.java |  25 +++
 .../protocol/SimpleProtocolHandlerTestsBase.java   | 184 ++++++++++++++++++++
 .../pulsar/proxy/extensions/ProxyExtensions.java   |   8 +
 .../apache/pulsar/proxy/server/ProxyService.java   |  12 +-
 ...SimpleProxyExtensionSeparateThreadPoolTest.java |  25 +++
 .../SimpleProxyExtensionSharedThreadPoolTest.java  |  25 +++
 .../extensions/SimpleProxyExtensionTestBase.java   | 192 +++++++++++++++++++++
 10 files changed, 512 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
index e41caf1..330b32f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -38,6 +41,9 @@ import org.apache.pulsar.broker.service.BrokerService;
 @Slf4j
 public class ProtocolHandlers implements AutoCloseable {
 
+    @Getter
+    private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();
+
     /**
      * Load the protocol handlers for the given <tt>protocol</tt> list.
      *
@@ -132,6 +138,7 @@ public class ProtocolHandlers implements AutoCloseable {
                         + " already occupied by other messaging protocols");
                 }
                 channelInitializers.put(handler.getKey(), initializers);
+                endpoints.put(address, handler.getKey());
             });
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e6a2574..9dbd6ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -385,16 +385,25 @@ public class BrokerService implements Closeable {
     private void startProtocolHandler(String protocol,
                                       SocketAddress address,
                                       ChannelInitializer<SocketChannel> initializer) throws IOException {
-        ServerBootstrap bootstrap = defaultServerBootstrap.clone();
+
         ServiceConfiguration configuration = pulsar.getConfiguration();
         boolean useSeparateThreadPool = configuration.isUseSeparateThreadPoolForProtocolHandlers();
+        ServerBootstrap bootstrap;
         if (useSeparateThreadPool) {
+            bootstrap = new ServerBootstrap();
+            bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+            bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                    new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
+            EventLoopUtil.enableTriggeredMode(bootstrap);
             DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
             EventLoopGroup dedicatedWorkerGroup =
                     EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
             bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
             protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
             bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
+        } else {
+            bootstrap = defaultServerBootstrap.clone();
         }
         bootstrap.childHandler(initializer);
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java
new file mode 100644
index 0000000..df803f8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+public class SimpleProtocolHandlerSeparateThreadPoolTest extends SimpleProtocolHandlerTestsBase {
+    public SimpleProtocolHandlerSeparateThreadPoolTest() {
+        super(true);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java
new file mode 100644
index 0000000..d48ab61
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+public class SimpleProtocolHandlerSharedThreadPoolTest extends SimpleProtocolHandlerTestsBase {
+    public SimpleProtocolHandlerSharedThreadPoolTest() {
+        super(false);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
new file mode 100644
index 0000000..5036424
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.NetUtil;
+import io.netty.util.concurrent.NonStickyEventExecutorGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+@Test(groups = "broker")
+public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
+
+    public static final class MyProtocolHandler implements ProtocolHandler {
+
+        private ServiceConfiguration conf;
+
+        @Override
+        public String protocolName() {
+            return "test";
+        }
+
+        @Override
+        public boolean accept(String protocol) {
+            return "test".equals(protocol);
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration conf) throws Exception {
+            this.conf = conf;
+        }
+
+        @Override
+        public String getProtocolDataToAdvertise() {
+            return "test";
+        }
+
+        @Override
+        public void start(BrokerService service) {
+
+        }
+
+        @Override
+        public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
+            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+                    new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                    socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+                        @Override
+                        public void channelActive(final ChannelHandlerContext ctx) {
+                            final ByteBuf resp = ctx.alloc().buffer();
+                            resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
+
+                            final ChannelFuture f = ctx.writeAndFlush(resp);
+                            f.addListener((ChannelFutureListener) future -> ctx.close());
+                        }
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+                            log.error("error", cause);
+                            ctx.close();
+                        }
+                    });
+                }
+            });
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    private File tempDirectory;
+    private boolean useSeparateThreadPool;
+
+    public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
+        this.useSeparateThreadPool = useSeparateThreadPool;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
+        conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
+        conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
+        conf.setMessagingProtocols(Collections.singleton("test"));
+        buildMockNarFile(tempDirectory);
+        super.baseSetup();
+    }
+
+    @Test
+    public void testBootstrapProtocolHandler() throws Exception {
+        SocketAddress address =
+                pulsar.getProtocolHandlers()
+                      .getEndpoints()
+                        .entrySet()
+                        .stream()
+                        .filter(e -> e.getValue().equals("test"))
+                        .map(Map.Entry::getKey)
+                        .findAny()
+                        .get();
+        try (Socket socket =  new Socket();) {
+            socket.connect(address);
+            String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
+            assertEquals(res, "ok");
+        }
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+
+        if (tempDirectory != null) {
+            FileUtils.deleteDirectory(tempDirectory);
+        }
+    }
+
+    private static void buildMockNarFile(File tempDirectory) throws Exception {
+        File file = new File(tempDirectory, "temp.nar");
+        try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
+
+            zipfile.putNextEntry(new ZipEntry("META-INF/"));
+            zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
+            zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
+
+            ZipEntry manifest = new ZipEntry("META-INF/services/"
+                    + ProtocolHandlerUtils.PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);
+            zipfile.putNextEntry(manifest);
+            String yaml = "name: test\n" +
+                    "description: this is a test\n" +
+                    "handlerClass: " + MyProtocolHandler.class.getName() + "\n";
+            zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
+            zipfile.closeEntry();
+        }
+    }
+
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
index ead2fa2..ba3c383 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
@@ -21,8 +21,12 @@ package org.apache.pulsar.proxy.extensions;
 import com.google.common.collect.ImmutableMap;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+
+import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
+
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.proxy.server.ProxyConfiguration;
 import org.apache.pulsar.proxy.server.ProxyService;
@@ -31,6 +35,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A collection of loaded extensions.
@@ -38,6 +43,8 @@ import java.util.Set;
 @Slf4j
 public class ProxyExtensions implements AutoCloseable {
 
+    @Getter
+    private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();
     /**
      * Load the extensions for the given <tt>extensions</tt> list.
      *
@@ -123,6 +130,7 @@ public class ProxyExtensions implements AutoCloseable {
                         + "` attempts to use " + address + " for its listening port. But it is"
                         + " already occupied by other messaging extensions");
                 }
+                endpoints.put(address, extension.getKey());
                 channelInitializers.put(extension.getKey(), initializers);
             });
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 4af2307..778f42b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -83,6 +83,7 @@ public class ProxyService implements Closeable {
     private MetadataStoreExtended localMetadataStore;
     private MetadataStoreExtended configMetadataStore;
     private PulsarResources pulsarResources;
+    @Getter
     private ProxyExtensions proxyExtensions = null;
 
     private final EventLoopGroup acceptorGroup;
@@ -265,15 +266,24 @@ public class ProxyService implements Closeable {
                                      SocketAddress address,
                                      ChannelInitializer<SocketChannel> initializer,
                                      ServerBootstrap serverBootstrap) throws IOException {
-        ServerBootstrap bootstrap = serverBootstrap.clone();
+        ServerBootstrap bootstrap;
         boolean useSeparateThreadPool = proxyConfig.isUseSeparateThreadPoolForProxyExtensions();
         if (useSeparateThreadPool) {
+            bootstrap = new ServerBootstrap();
+            bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+            bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                    new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
+
+            EventLoopUtil.enableTriggeredMode(bootstrap);
             DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName);
             EventLoopGroup dedicatedWorkerGroup =
                     EventLoopUtil.newEventLoopGroup(numThreads, false, defaultThreadFactory);
             extensionsWorkerGroups.add(dedicatedWorkerGroup);
             bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
             bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
+        } else {
+            bootstrap = serverBootstrap.clone();
         }
         bootstrap.childHandler(initializer);
         try {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java
new file mode 100644
index 0000000..2c2281f
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+public class SimpleProxyExtensionSeparateThreadPoolTest extends SimpleProxyExtensionTestBase {
+    public SimpleProxyExtensionSeparateThreadPoolTest() {
+        super(true);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java
new file mode 100644
index 0000000..35fb728
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+public class SimpleProxyExtensionSharedThreadPoolTest extends SimpleProxyExtensionTestBase {
+    public SimpleProxyExtensionSharedThreadPoolTest() {
+        super(false);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
new file mode 100644
index 0000000..0ae6405
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+@Test(groups = "proxy")
+public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBaseTest {
+
+    public static final class MyProxyExtension implements ProxyExtension {
+
+        private ProxyConfiguration conf;
+
+        @Override
+        public String extensionName() {
+            return "test";
+        }
+
+        @Override
+        public boolean accept(String protocol) {
+            return "test".equals(protocol);
+        }
+
+        @Override
+        public void initialize(ProxyConfiguration conf) throws Exception {
+            this.conf = conf;
+        }
+
+        @Override
+        public void start(ProxyService service) {
+        }
+
+        @Override
+        public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
+            return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+                    new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                    socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+                        @Override
+                        public void channelActive(final ChannelHandlerContext ctx) {
+                            final ByteBuf resp = ctx.alloc().buffer();
+                            resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
+
+                            final ChannelFuture f = ctx.writeAndFlush(resp);
+                            f.addListener((ChannelFutureListener) future -> ctx.close());
+                        }
+                        @Override
+                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+                            log.error("error", cause);
+                            ctx.close();
+                        }
+                    });
+                }
+            });
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    private File tempDirectory;
+    private ProxyService proxyService;
+    private boolean useSeparateThreadPoolForProxyExtensions;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
+        this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        tempDirectory = Files.createTempDirectory("SimpleProxyExtensionTest").toFile();
+
+        super.internalSetup();
+        proxyConfig.setUseSeparateThreadPoolForProxyExtensions(useSeparateThreadPoolForProxyExtensions);
+        proxyConfig.setProxyExtensionsDirectory(tempDirectory.getAbsolutePath());
+        proxyConfig.setProxyExtensions(Collections.singleton("test"));
+        buildMockNarFile(tempDirectory);
+        proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+
+        proxyService.start();
+    }
+
+    @Test
+    public void testBootstrapProtocolHandler() throws Exception {
+        SocketAddress address =
+                proxyService.getProxyExtensions()
+                      .getEndpoints()
+                        .entrySet()
+                        .stream()
+                        .filter(e -> e.getValue().equals("test"))
+                        .map(Map.Entry::getKey)
+                        .findAny()
+                        .get();
+        try (Socket socket =  new Socket();) {
+            socket.connect(address);
+            String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
+            assertEquals(res, "ok");
+        }
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+
+        if (tempDirectory != null) {
+            FileUtils.deleteDirectory(tempDirectory);
+        }
+    }
+
+    private static void buildMockNarFile(File tempDirectory) throws Exception {
+        File file = new File(tempDirectory, "temp.nar");
+        try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
+
+            zipfile.putNextEntry(new ZipEntry("META-INF/"));
+            zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
+            zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
+
+            ZipEntry manifest = new ZipEntry("META-INF/services/"
+                    + ProxyExtensionsUtils.PROXY_EXTENSION_DEFINITION_FILE);
+            zipfile.putNextEntry(manifest);
+            String yaml = "name: test\n" +
+                    "description: this is a test\n" +
+                    "extensionClass: " + MyProxyExtension.class.getName() + "\n";
+            zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
+            zipfile.closeEntry();
+        }
+    }
+
+}