You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/08 10:30:32 UTC
[pulsar] branch master updated: pip28-v2 add unit test (#3915)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95f07f8 pip28-v2 add unit test (#3915)
95f07f8 is described below
commit 95f07f819ca8980fbf4b8a4136ba5ea5cfb5e597
Author: Samuel <fo...@yahoo.com>
AuthorDate: Wed May 8 18:30:23 2019 +0800
pip28-v2 add unit test (#3915)
### Motivation
https://github.com/apache/pulsar/wiki/PIP-28%3A-Pulsar-Proxy-Gateway-Improvement
### Modifications
added a new handler **ParserProxyHandler.java** to parse requests independently and output
---
.../pulsar/proxy/server/DirectProxyHandler.java | 26 ++-
.../pulsar/proxy/server/ParserProxyHandler.java | 179 +++++++++++++++
.../pulsar/proxy/server/ProxyConfiguration.java | 16 ++
.../apache/pulsar/proxy/server/ProxyService.java | 7 +
.../pulsar/proxy/server/ProxyParserTest.java | 254 +++++++++++++++++++++
5 files changed, 479 insertions(+), 3 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index bfe3485..ffa4c2c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -26,6 +26,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLSession;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
@@ -44,6 +46,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -60,6 +63,7 @@ public class DirectProxyHandler {
private Channel inboundChannel;
Channel outboundChannel;
+ protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
private String originalPrincipal;
private AuthData clientAuthData;
private String clientAuthMethod;
@@ -122,6 +126,15 @@ public class DirectProxyHandler {
final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
.get("proxyOutboundHandler");
cnx.setRemoteHostName(targetBroker.getHost());
+
+ // if enable full parsing feature
+ if (ProxyService.proxyLogLevel == 2) {
+ //Set a map between inbound and outbound,
+ //so can find inbound by outbound or find outbound by inbound
+ inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
+ }
+
+
});
}
@@ -250,9 +263,16 @@ public class DirectProxyHandler {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
}
- inboundChannel.pipeline().remove("frameDecoder");
- outboundChannel.pipeline().remove("frameDecoder");
-
+ if (ProxyService.proxyLogLevel == 0) {
+ // direct tcp proxy
+ inboundChannel.pipeline().remove("frameDecoder");
+ outboundChannel.pipeline().remove("frameDecoder");
+ } else {
+ // Enable parsing feature, proxyLogLevel(1 or 2)
+ // Add parser handler
+ inboundChannel.pipeline().addBefore("handler" , "inboundParser" , new ParserProxyHandler(inboundChannel , ParserProxyHandler.FRONTEND_CONN));
+ outboundChannel.pipeline().addBefore("proxyOutboundHandler" , "outboundParser" , new ParserProxyHandler(outboundChannel , ParserProxyHandler.BACKEND_CONN));
+ }
// Start reading from both connections
inboundChannel.read();
outboundChannel.read();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
new file mode 100644
index 0000000..040a834
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+
+import avro.shaded.com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+
+public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
+
+
+ private Channel channel;
+ //inbound
+ protected static final String FRONTEND_CONN = "frontendconn";
+ //outbound
+ protected static final String BACKEND_CONN = "backendconn";
+
+ private String connType;
+
+
+ //producerid+channelid as key
+ //or consumerid+channelid as key
+ private static Map<String, String> producerHashMap = new ConcurrentHashMap<>();
+ private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
+
+ public ParserProxyHandler(Channel channel, String type){
+ this.channel = channel;
+ this.connType=type;
+ }
+
+ private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{
+
+ if (messages != null) {
+ // lag
+ for (int i=0; i<messages.size(); i++) {
+ info = info + "["+ (System.currentTimeMillis() - messages.get(i).getPublishTime()) + "] " + new String(ByteBufUtil.getBytes((messages.get(i)).getData()), "UTF8");
+ }
+ }
+ // log conn format is like from source to target
+ switch (this.connType) {
+ case ParserProxyHandler.FRONTEND_CONN:
+ log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info);
+ break;
+ case ParserProxyHandler.BACKEND_CONN:
+ log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info);
+ break;
+ }
+ }
+
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ PulsarApi.BaseCommand cmd = null;
+ PulsarApi.BaseCommand.Builder cmdBuilder = null;
+ TopicName topicName ;
+ List<RawMessage> messages = Lists.newArrayList();
+ ByteBuf buffer = (ByteBuf)(msg);
+
+ try {
+ buffer.markReaderIndex();
+ buffer.markWriterIndex();
+
+ int cmdSize = (int) buffer.readUnsignedInt();
+ int writerIndex = buffer.writerIndex();
+ buffer.writerIndex(buffer.readerIndex() + cmdSize);
+
+ ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
+ cmdBuilder = PulsarApi.BaseCommand.newBuilder();
+ cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
+ buffer.writerIndex(writerIndex);
+ cmdInputStream.recycle();
+
+ switch (cmd.getType()) {
+ case PRODUCER:
+ ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic());
+
+ logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
+ break;
+
+ case SEND:
+ if (ProxyService.proxyLogLevel != 2) {
+ logging(ctx.channel() , cmd.getType() , "", null);
+ break;
+ }
+ topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id())));
+ MessageParser.parseMessage(topicName, -1L,
+ -1L,buffer,(message) -> {
+ messages.add(message);
+ });
+
+ logging(ctx.channel() , cmd.getType() , "" , messages);
+ break;
+
+ case SUBSCRIBE:
+ ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic());
+
+ logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+ break;
+
+ case MESSAGE:
+ if (ProxyService.proxyLogLevel != 2) {
+ logging(ctx.channel() , cmd.getType() , "" , null);
+ break;
+ }
+ topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+ MessageParser.parseMessage(topicName, -1L,
+ -1L,buffer,(message) -> {
+ messages.add(message);
+ });
+
+
+ logging(ctx.channel() , cmd.getType() , "" , messages);
+ break;
+
+ default:
+ logging(ctx.channel() , cmd.getType() , "" , null);
+ break;
+ }
+ } catch (Exception e){
+
+ log.error("{},{},{}" , e.getMessage() , e.getStackTrace() , e.getCause());
+
+ } finally {
+
+ if (cmdBuilder != null) {
+ cmdBuilder.recycle();
+ }
+ if (cmd != null) {
+ cmd.recycle();
+ }
+ buffer.resetReaderIndex();
+ buffer.resetWriterIndex();
+
+ // add totalSize to buffer Head
+ ByteBuf totalSizeBuf = Unpooled.buffer(4);
+ totalSizeBuf.writeInt(buffer.readableBytes());
+ CompositeByteBuf compBuf = Unpooled.compositeBuffer();
+ compBuf.addComponents(totalSizeBuf,buffer);
+ compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity());
+
+ //next handler
+ ctx.fireChannelRead(compBuf);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ParserProxyHandler.class);
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 7b9a2c5..1ca1159 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -147,6 +147,15 @@ public class ProxyConfiguration implements PulsarConfiguration {
private Integer webServicePortTls;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Proxy log level, default is 0."
+ + " 0: Do not log any tcp channel info"
+ + " 1: Parse and log any tcp channel info and command info without message body"
+ + " 2: Parse and log channel info, command info and message body"
+ )
+ private Integer proxyLogLevel = 0;
+
+ @FieldContext(
category = CATEGORY_SERVER,
doc = "Path for the file used to determine the rotation status for the proxy instance"
+ " when responding to service discovery health checks"
@@ -365,6 +374,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
return Optional.ofNullable(servicePort);
}
+ public Optional<Integer> getproxyLogLevel() {
+ return Optional.ofNullable(proxyLogLevel);
+ }
+ public void setProxyLogLevel(int proxyLogLevel) {
+ this.proxyLogLevel = proxyLogLevel;
+ }
+
public Optional<Integer> getServicePortTls() {
return Optional.ofNullable(servicePortTls);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 317daa3..a591af1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -70,6 +70,8 @@ public class ProxyService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
+ protected static int proxyLogLevel;
+
private static final int numThreads = Runtime.getRuntime().availableProcessors();
static final Gauge activeConnections = Gauge
@@ -116,6 +118,11 @@ public class ProxyService implements Closeable {
this.serviceUrlTls = null;
}
+ if (proxyConfig.getproxyLogLevel().isPresent()) {
+ ProxyService.proxyLogLevel = Integer.valueOf(proxyConfig.getproxyLogLevel().get());
+ } else {
+ ProxyService.proxyLogLevel = 0;
+ }
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
this.authenticationService = authenticationService;
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
new file mode 100644
index 0000000..b9ecd41
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+
+public class ProxyParserTest extends MockedPulsarServiceBaseTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ProxyParserTest.class);
+
+ private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+ @Override
+ @BeforeClass
+ protected void setup() throws Exception {
+ internalSetup();
+
+
+
+ proxyConfig.setServicePort(PortManager.nextFreePort());
+ proxyConfig.setZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+ //enable full parsing feature
+ proxyConfig.setProxyLogLevel(2);
+
+ proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig))));
+ doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+ Optional<Integer> proxyLogLevel = Optional.of(2);
+ assertEquals( proxyLogLevel , proxyService.getConfiguration().getproxyLogLevel());
+ proxyService.start();
+ }
+
+ @Override
+ @AfterClass
+ protected void cleanup() throws Exception {
+ internalCleanup();
+
+ proxyService.close();
+ }
+
+ @Test
+ public void testProducer() throws Exception {
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+ .build();
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+
+ client.close();
+ }
+
+ @Test
+ public void testProducerConsumer() throws Exception {
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+ .build();
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/producer-consumer-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // Create a consumer directly attached to broker
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic("persistent://sample/test/local/producer-consumer-topic").subscriptionName("my-sub").subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ Message<byte[]> msg = consumer.receive(0, TimeUnit.SECONDS);
+ checkArgument(msg == null);
+
+ consumer.close();
+ client.close();
+ }
+
+ @Test
+ public void testPartitions() throws Exception {
+ admin.tenants().createTenant("sample", new TenantInfo());
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+ .build();
+ admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
+
+ Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/partitioned-topic")
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+ // Create a consumer directly attached to broker
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
+ .subscriptionName("my-sub").subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ }
+
+ client.close();
+ }
+
+ @Test
+ public void testRegexSubscription() throws Exception {
+ PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+ .connectionsPerBroker(5).ioThreads(5).build();
+
+ // create two topics by subscribing to a topic and closing it
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic1")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+ try (Consumer<byte[]> ignored = client.newConsumer()
+ .topic("persistent://sample/test/local/topic2")
+ .subscriptionName("ignored")
+ .subscribe()) {
+ }
+
+ // make sure regex subscription
+ String regexSubscriptionPattern = "persistent://sample/test/local/topic.*";
+ log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .topicsPattern(regexSubscriptionPattern)
+ .subscriptionName("regex-sub")
+ .subscribe()) {
+ log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern);
+
+ final int numMessages = 20;
+
+ try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic("persistent://sample/test/local/topic1")
+ .create()) {
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(("message-" + i).getBytes(UTF_8));
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ assertEquals("message-" + i, new String(msg.getValue(), UTF_8));
+ }
+ }
+ }
+
+ @Test
+ private void testProtocolVersionAdvertisement() throws Exception {
+ final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get();
+ final String topic = "persistent://sample/test/local/protocol-version-advertisement";
+ final String sub = "my-sub";
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl(url);
+ PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+ Producer<byte[]> producer = client.newProducer().topic(topic).create();
+ Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
+ .subscriptionType(SubscriptionType.Failover).subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test-msg".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+ private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+ throws Exception {
+ ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+ ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
+ return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+ @Override
+ protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ });
+
+ return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+ }
+
+}