You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/29 11:38:39 UTC
[pulsar] branch master updated: Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3d580f Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)
d3d580f is described below
commit d3d580f3a23f58f3612cbceeb817d357efefd30d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Nov 29 12:35:46 2021 +0100
Protocol Handlers and Proxy Extensions: Fix bootstrap and add tests (#12917)
---
.../pulsar/broker/protocol/ProtocolHandlers.java | 7 +
.../pulsar/broker/service/BrokerService.java | 11 +-
...impleProtocolHandlerSeparateThreadPoolTest.java | 25 +++
.../SimpleProtocolHandlerSharedThreadPoolTest.java | 25 +++
.../protocol/SimpleProtocolHandlerTestsBase.java | 184 ++++++++++++++++++++
.../pulsar/proxy/extensions/ProxyExtensions.java | 8 +
.../apache/pulsar/proxy/server/ProxyService.java | 12 +-
...SimpleProxyExtensionSeparateThreadPoolTest.java | 25 +++
.../SimpleProxyExtensionSharedThreadPoolTest.java | 25 +++
.../extensions/SimpleProxyExtensionTestBase.java | 192 +++++++++++++++++++++
10 files changed, 512 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
index e41caf1..330b32f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/protocol/ProtocolHandlers.java
@@ -25,9 +25,12 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
@@ -38,6 +41,9 @@ import org.apache.pulsar.broker.service.BrokerService;
@Slf4j
public class ProtocolHandlers implements AutoCloseable {
+ @Getter
+ private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();
+
/**
* Load the protocol handlers for the given <tt>protocol</tt> list.
*
@@ -132,6 +138,7 @@ public class ProtocolHandlers implements AutoCloseable {
+ " already occupied by other messaging protocols");
}
channelInitializers.put(handler.getKey(), initializers);
+ endpoints.put(address, handler.getKey());
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e6a2574..9dbd6ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -385,16 +385,25 @@ public class BrokerService implements Closeable {
private void startProtocolHandler(String protocol,
SocketAddress address,
ChannelInitializer<SocketChannel> initializer) throws IOException {
- ServerBootstrap bootstrap = defaultServerBootstrap.clone();
+
ServiceConfiguration configuration = pulsar.getConfiguration();
boolean useSeparateThreadPool = configuration.isUseSeparateThreadPoolForProtocolHandlers();
+ ServerBootstrap bootstrap;
if (useSeparateThreadPool) {
+ bootstrap = new ServerBootstrap();
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+ bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+ new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
+ EventLoopUtil.enableTriggeredMode(bootstrap);
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
protocolHandlersWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
+ } else {
+ bootstrap = defaultServerBootstrap.clone();
}
bootstrap.childHandler(initializer);
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java
new file mode 100644
index 0000000..df803f8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSeparateThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+public class SimpleProtocolHandlerSeparateThreadPoolTest extends SimpleProtocolHandlerTestsBase {
+ public SimpleProtocolHandlerSeparateThreadPoolTest() {
+ super(true);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java
new file mode 100644
index 0000000..d48ab61
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerSharedThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+public class SimpleProtocolHandlerSharedThreadPoolTest extends SimpleProtocolHandlerTestsBase {
+ public SimpleProtocolHandlerSharedThreadPoolTest() {
+ super(false);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
new file mode 100644
index 0000000..5036424
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.NetUtil;
+import io.netty.util.concurrent.NonStickyEventExecutorGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+@Test(groups = "broker")
+public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase {
+
+ public static final class MyProtocolHandler implements ProtocolHandler {
+
+ private ServiceConfiguration conf;
+
+ @Override
+ public String protocolName() {
+ return "test";
+ }
+
+ @Override
+ public boolean accept(String protocol) {
+ return "test".equals(protocol);
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration conf) throws Exception {
+ this.conf = conf;
+ }
+
+ @Override
+ public String getProtocolDataToAdvertise() {
+ return "test";
+ }
+
+ @Override
+ public void start(BrokerService service) {
+
+ }
+
+ @Override
+ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
+ return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+ new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) {
+ final ByteBuf resp = ctx.alloc().buffer();
+ resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
+
+ final ChannelFuture f = ctx.writeAndFlush(resp);
+ f.addListener((ChannelFutureListener) future -> ctx.close());
+ }
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("error", cause);
+ ctx.close();
+ }
+ });
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ private File tempDirectory;
+ private boolean useSeparateThreadPool;
+
+ public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
+ this.useSeparateThreadPool = useSeparateThreadPool;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
+ conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
+ conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
+ conf.setMessagingProtocols(Collections.singleton("test"));
+ buildMockNarFile(tempDirectory);
+ super.baseSetup();
+ }
+
+ @Test
+ public void testBootstrapProtocolHandler() throws Exception {
+ SocketAddress address =
+ pulsar.getProtocolHandlers()
+ .getEndpoints()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue().equals("test"))
+ .map(Map.Entry::getKey)
+ .findAny()
+ .get();
+ try (Socket socket = new Socket();) {
+ socket.connect(address);
+ String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
+ assertEquals(res, "ok");
+ }
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectory(tempDirectory);
+ }
+ }
+
+ private static void buildMockNarFile(File tempDirectory) throws Exception {
+ File file = new File(tempDirectory, "temp.nar");
+ try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
+
+ zipfile.putNextEntry(new ZipEntry("META-INF/"));
+ zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
+ zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
+
+ ZipEntry manifest = new ZipEntry("META-INF/services/"
+ + ProtocolHandlerUtils.PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);
+ zipfile.putNextEntry(manifest);
+ String yaml = "name: test\n" +
+ "description: this is a test\n" +
+ "handlerClass: " + MyProtocolHandler.class.getName() + "\n";
+ zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
+ zipfile.closeEntry();
+ }
+ }
+
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
index ead2fa2..ba3c383 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java
@@ -21,8 +21,12 @@ package org.apache.pulsar.proxy.extensions;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
+
+import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
+
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
@@ -31,6 +35,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A collection of loaded extensions.
@@ -38,6 +43,8 @@ import java.util.Set;
@Slf4j
public class ProxyExtensions implements AutoCloseable {
+ @Getter
+ private final Map<SocketAddress, String> endpoints = new ConcurrentHashMap<>();
/**
* Load the extensions for the given <tt>extensions</tt> list.
*
@@ -123,6 +130,7 @@ public class ProxyExtensions implements AutoCloseable {
+ "` attempts to use " + address + " for its listening port. But it is"
+ " already occupied by other messaging extensions");
}
+ endpoints.put(address, extension.getKey());
channelInitializers.put(extension.getKey(), initializers);
});
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 4af2307..778f42b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -83,6 +83,7 @@ public class ProxyService implements Closeable {
private MetadataStoreExtended localMetadataStore;
private MetadataStoreExtended configMetadataStore;
private PulsarResources pulsarResources;
+ @Getter
private ProxyExtensions proxyExtensions = null;
private final EventLoopGroup acceptorGroup;
@@ -265,15 +266,24 @@ public class ProxyService implements Closeable {
SocketAddress address,
ChannelInitializer<SocketChannel> initializer,
ServerBootstrap serverBootstrap) throws IOException {
- ServerBootstrap bootstrap = serverBootstrap.clone();
+ ServerBootstrap bootstrap;
boolean useSeparateThreadPool = proxyConfig.isUseSeparateThreadPoolForProxyExtensions();
if (useSeparateThreadPool) {
+ bootstrap = new ServerBootstrap();
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+ bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+ new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
+
+ EventLoopUtil.enableTriggeredMode(bootstrap);
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(numThreads, false, defaultThreadFactory);
extensionsWorkerGroups.add(dedicatedWorkerGroup);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);
+ } else {
+ bootstrap = serverBootstrap.clone();
}
bootstrap.childHandler(initializer);
try {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java
new file mode 100644
index 0000000..2c2281f
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSeparateThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+public class SimpleProxyExtensionSeparateThreadPoolTest extends SimpleProxyExtensionTestBase {
+ public SimpleProxyExtensionSeparateThreadPoolTest() {
+ super(true);
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java
new file mode 100644
index 0000000..35fb728
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionSharedThreadPoolTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+public class SimpleProxyExtensionSharedThreadPoolTest extends SimpleProxyExtensionTestBase {
+ public SimpleProxyExtensionSharedThreadPoolTest() {
+ super(false);
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
new file mode 100644
index 0000000..0ae6405
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.extensions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+@Test(groups = "proxy")
+public abstract class SimpleProxyExtensionTestBase extends MockedPulsarServiceBaseTest {
+
+ public static final class MyProxyExtension implements ProxyExtension {
+
+ private ProxyConfiguration conf;
+
+ @Override
+ public String extensionName() {
+ return "test";
+ }
+
+ @Override
+ public boolean accept(String protocol) {
+ return "test".equals(protocol);
+ }
+
+ @Override
+ public void initialize(ProxyConfiguration conf) throws Exception {
+ this.conf = conf;
+ }
+
+ @Override
+ public void start(ProxyService service) {
+ }
+
+ @Override
+ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
+ return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()),
+ new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) {
+ final ByteBuf resp = ctx.alloc().buffer();
+ resp.writeBytes("ok".getBytes(StandardCharsets.UTF_8));
+
+ final ChannelFuture f = ctx.writeAndFlush(resp);
+ f.addListener((ChannelFutureListener) future -> ctx.close());
+ }
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("error", cause);
+ ctx.close();
+ }
+ });
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ private File tempDirectory;
+ private ProxyService proxyService;
+ private boolean useSeparateThreadPoolForProxyExtensions;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
+ this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ tempDirectory = Files.createTempDirectory("SimpleProxyExtensionTest").toFile();
+
+ super.internalSetup();
+ proxyConfig.setUseSeparateThreadPoolForProxyExtensions(useSeparateThreadPoolForProxyExtensions);
+ proxyConfig.setProxyExtensionsDirectory(tempDirectory.getAbsolutePath());
+ proxyConfig.setProxyExtensions(Collections.singleton("test"));
+ buildMockNarFile(tempDirectory);
+ proxyConfig.setServicePort(Optional.ofNullable(0));
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+
+ proxyService.start();
+ }
+
+ @Test
+ public void testBootstrapProtocolHandler() throws Exception {
+ SocketAddress address =
+ proxyService.getProxyExtensions()
+ .getEndpoints()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue().equals("test"))
+ .map(Map.Entry::getKey)
+ .findAny()
+ .get();
+ try (Socket socket = new Socket();) {
+ socket.connect(address);
+ String res = IOUtils.toString(socket.getInputStream(), StandardCharsets.UTF_8);
+ assertEquals(res, "ok");
+ }
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ proxyService.close();
+
+ if (tempDirectory != null) {
+ FileUtils.deleteDirectory(tempDirectory);
+ }
+ }
+
+ private static void buildMockNarFile(File tempDirectory) throws Exception {
+ File file = new File(tempDirectory, "temp.nar");
+ try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
+
+ zipfile.putNextEntry(new ZipEntry("META-INF/"));
+ zipfile.putNextEntry(new ZipEntry("META-INF/services/"));
+ zipfile.putNextEntry(new ZipEntry("META-INF/bundled-dependencies/"));
+
+ ZipEntry manifest = new ZipEntry("META-INF/services/"
+ + ProxyExtensionsUtils.PROXY_EXTENSION_DEFINITION_FILE);
+ zipfile.putNextEntry(manifest);
+ String yaml = "name: test\n" +
+ "description: this is a test\n" +
+ "extensionClass: " + MyProxyExtension.class.getName() + "\n";
+ zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
+ zipfile.closeEntry();
+ }
+ }
+
+}