You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/08 10:30:32 UTC

[pulsar] branch master updated: pip28-v2 add unit test (#3915)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95f07f8  pip28-v2 add unit test (#3915)
95f07f8 is described below

commit 95f07f819ca8980fbf4b8a4136ba5ea5cfb5e597
Author: Samuel <fo...@yahoo.com>
AuthorDate: Wed May 8 18:30:23 2019 +0800

    pip28-v2 add unit test (#3915)
    
    ### Motivation
    
    https://github.com/apache/pulsar/wiki/PIP-28%3A-Pulsar-Proxy-Gateway-Improvement
    
    
    ### Modifications
    
    added a new handler **ParserProxyHandler.java** to parse requests independently and output
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  26 ++-
 .../pulsar/proxy/server/ParserProxyHandler.java    | 179 +++++++++++++++
 .../pulsar/proxy/server/ProxyConfiguration.java    |  16 ++
 .../apache/pulsar/proxy/server/ProxyService.java   |   7 +
 .../pulsar/proxy/server/ProxyParserTest.java       | 254 +++++++++++++++++++++
 5 files changed, 479 insertions(+), 3 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index bfe3485..ffa4c2c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -26,6 +26,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.net.ssl.SSLSession;
 
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
@@ -44,6 +46,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -60,6 +63,7 @@ public class DirectProxyHandler {
 
     private Channel inboundChannel;
     Channel outboundChannel;
+    protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
     private String originalPrincipal;
     private AuthData clientAuthData;
     private String clientAuthMethod;
@@ -122,6 +126,15 @@ public class DirectProxyHandler {
             final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
                     .get("proxyOutboundHandler");
             cnx.setRemoteHostName(targetBroker.getHost());
+
+            // if enable full parsing feature
+            if (ProxyService.proxyLogLevel == 2) {
+                //Set a map between inbound and outbound,
+                //so can find inbound by outbound or find outbound by inbound
+                inboundOutboundChannelMap.put(outboundChannel.id() , inboundChannel.id());
+            }
+
+
         });
     }
 
@@ -250,9 +263,16 @@ public class DirectProxyHandler {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
                 }
-                inboundChannel.pipeline().remove("frameDecoder");
-                outboundChannel.pipeline().remove("frameDecoder");
-
+                if (ProxyService.proxyLogLevel == 0) {
+                    // direct tcp proxy
+                    inboundChannel.pipeline().remove("frameDecoder");
+                    outboundChannel.pipeline().remove("frameDecoder");
+                } else {
+                    // Enable parsing feature, proxyLogLevel(1 or 2)
+                    // Add parser handler
+                    inboundChannel.pipeline().addBefore("handler" , "inboundParser" , new ParserProxyHandler(inboundChannel , ParserProxyHandler.FRONTEND_CONN));
+                    outboundChannel.pipeline().addBefore("proxyOutboundHandler" , "outboundParser" , new ParserProxyHandler(outboundChannel , ParserProxyHandler.BACKEND_CONN));
+                }
                 // Start reading from both connections
                 inboundChannel.read();
                 outboundChannel.read();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
new file mode 100644
index 0000000..040a834
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+
+import avro.shaded.com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+
+public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
+
+
+    private Channel channel;
+    //inbound
+    protected static final String FRONTEND_CONN = "frontendconn";
+    //outbound
+    protected static final String BACKEND_CONN = "backendconn";
+
+    private String connType;
+
+
+    //producerid+channelid as key
+    //or consumerid+channelid as key
+    private static Map<String, String> producerHashMap = new ConcurrentHashMap<>();
+    private static Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
+
+    public ParserProxyHandler(Channel channel, String type){
+        this.channel = channel;
+        this.connType=type;
+    }
+
+    private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception{
+
+        if (messages != null) {
+            // lag
+            for (int i=0; i<messages.size(); i++) {
+                info = info + "["+ (System.currentTimeMillis() - messages.get(i).getPublishTime()) + "] " + new String(ByteBufUtil.getBytes((messages.get(i)).getData()), "UTF8");
+            }
+        }
+        // log conn format is like from source to target
+        switch (this.connType) {
+            case ParserProxyHandler.FRONTEND_CONN:
+                log.info(ParserProxyHandler.FRONTEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.remoteAddress() + conn.localAddress() + "]", cmdtype, info);
+                break;
+            case ParserProxyHandler.BACKEND_CONN:
+                log.info(ParserProxyHandler.BACKEND_CONN + ":{} cmd:{} msg:{}", "[" + conn.localAddress() + conn.remoteAddress() + "]", cmdtype, info);
+                break;
+        }
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        PulsarApi.BaseCommand cmd = null;
+        PulsarApi.BaseCommand.Builder cmdBuilder = null;
+        TopicName topicName ;
+        List<RawMessage> messages = Lists.newArrayList();
+        ByteBuf buffer = (ByteBuf)(msg);
+
+        try {
+            buffer.markReaderIndex();
+            buffer.markWriterIndex();
+
+            int cmdSize = (int) buffer.readUnsignedInt();
+            int writerIndex = buffer.writerIndex();
+            buffer.writerIndex(buffer.readerIndex() + cmdSize);
+
+            ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
+            cmdBuilder = PulsarApi.BaseCommand.newBuilder();
+            cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
+            buffer.writerIndex(writerIndex);
+            cmdInputStream.recycle();
+
+            switch (cmd.getType()) {
+                case PRODUCER:
+                    ParserProxyHandler.producerHashMap.put(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id()), cmd.getProducer().getTopic());
+
+                    logging(ctx.channel() , cmd.getType() , "{producer:" + cmd.getProducer().getProducerName() + ",topic:" + cmd.getProducer().getTopic() + "}", null);
+                    break;
+
+                case SEND:
+                    if (ProxyService.proxyLogLevel != 2) {
+                        logging(ctx.channel() , cmd.getType() , "", null);
+                        break;
+                    }
+                    topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(String.valueOf(cmd.getProducer().getProducerId()) + "," + String.valueOf(ctx.channel().id())));
+                    MessageParser.parseMessage(topicName,  -1L,
+                            -1L,buffer,(message) -> {
+                                messages.add(message);
+                            });
+
+                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    break;
+
+                case SUBSCRIBE:
+                    ParserProxyHandler.consumerHashMap.put(String.valueOf(cmd.getSubscribe().getConsumerId()) + "," + String.valueOf(ctx.channel().id()) , cmd.getSubscribe().getTopic());
+
+                    logging(ctx.channel() , cmd.getType() , "{consumer:" + cmd.getSubscribe().getConsumerName() + ",topic:" + cmd.getSubscribe().getTopic() + "}" , null);
+                    break;
+
+                case MESSAGE:
+                    if (ProxyService.proxyLogLevel != 2) {
+                        logging(ctx.channel() , cmd.getType() , "" , null);
+                        break;
+                    }
+                    topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(String.valueOf(cmd.getMessage().getConsumerId()) + "," + DirectProxyHandler.inboundOutboundChannelMap.get(ctx.channel().id())));
+                    MessageParser.parseMessage(topicName,  -1L,
+                                -1L,buffer,(message) -> {
+                                    messages.add(message);
+                                });
+
+
+                    logging(ctx.channel() , cmd.getType() , "" , messages);
+                    break;
+
+                 default:
+                    logging(ctx.channel() , cmd.getType() , "" , null);
+                    break;
+            }
+        } catch (Exception e){
+
+            log.error("{},{},{}" , e.getMessage() , e.getStackTrace() ,  e.getCause());
+
+        } finally {
+
+            if (cmdBuilder != null) {
+                cmdBuilder.recycle();
+            }
+            if (cmd != null) {
+                cmd.recycle();
+            }
+            buffer.resetReaderIndex();
+            buffer.resetWriterIndex();
+
+            // add totalSize to buffer Head
+            ByteBuf totalSizeBuf = Unpooled.buffer(4);
+            totalSizeBuf.writeInt(buffer.readableBytes());
+            CompositeByteBuf compBuf = Unpooled.compositeBuffer();
+            compBuf.addComponents(totalSizeBuf,buffer);
+            compBuf.writerIndex(totalSizeBuf.capacity()+buffer.capacity());
+
+            //next handler
+            ctx.fireChannelRead(compBuf);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ParserProxyHandler.class);
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 7b9a2c5..1ca1159 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -147,6 +147,15 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private Integer webServicePortTls;
 
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Proxy log level, default is 0."
+                    + " 0: Do not log any tcp channel info"
+                    + " 1: Parse and log any tcp channel info and command info without message body"
+                    + " 2: Parse and log channel info, command info and message body"
+    )
+    private Integer proxyLogLevel = 0;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Path for the file used to determine the rotation status for the proxy instance"
             + " when responding to service discovery health checks"
@@ -365,6 +374,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
         return Optional.ofNullable(servicePort);
     }
 
+    public Optional<Integer> getproxyLogLevel() {
+        return Optional.ofNullable(proxyLogLevel);
+    }
+    public void setProxyLogLevel(int proxyLogLevel) {
+        this.proxyLogLevel = proxyLogLevel;
+    }
+
     public Optional<Integer> getServicePortTls() {
         return Optional.ofNullable(servicePortTls);
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 317daa3..a591af1 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -70,6 +70,8 @@ public class ProxyService implements Closeable {
 
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
 
+    protected static int proxyLogLevel;
+
     private static final int numThreads = Runtime.getRuntime().availableProcessors();
 
     static final Gauge activeConnections = Gauge
@@ -116,6 +118,11 @@ public class ProxyService implements Closeable {
             this.serviceUrlTls = null;
         }
 
+        if (proxyConfig.getproxyLogLevel().isPresent()) {
+            ProxyService.proxyLogLevel = Integer.valueOf(proxyConfig.getproxyLogLevel().get());
+        } else {
+            ProxyService.proxyLogLevel = 0;
+        }
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
         this.authenticationService = authenticationService;
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
new file mode 100644
index 0000000..b9ecd41
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+
+public class ProxyParserTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ProxyParserTest.class);
+
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+
+
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
+        //enable full parsing feature
+        proxyConfig.setProxyLogLevel(2);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
+                                                            PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
+
+        Optional<Integer> proxyLogLevel = Optional.of(2);
+        assertEquals( proxyLogLevel , proxyService.getConfiguration().getproxyLogLevel());
+        proxyService.start();
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        internalCleanup();
+
+        proxyService.close();
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+                .build();
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic")
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+
+        client.close();
+    }
+
+    @Test
+    public void testProducerConsumer() throws Exception {
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+                .build();
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+            .topic("persistent://sample/test/local/producer-consumer-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create();
+
+        // Create a consumer directly attached to broker
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic("persistent://sample/test/local/producer-consumer-topic").subscriptionName("my-sub").subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            checkNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        Message<byte[]> msg = consumer.receive(0, TimeUnit.SECONDS);
+        checkArgument(msg == null);
+
+        consumer.close();
+        client.close();
+    }
+
+    @Test
+    public void testPartitions() throws Exception {
+        admin.tenants().createTenant("sample", new TenantInfo());
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+                .build();
+        admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2);
+
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+            .topic("persistent://sample/test/local/partitioned-topic")
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+        // Create a consumer directly attached to broker
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic")
+                .subscriptionName("my-sub").subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+            checkNotNull(msg);
+        }
+
+        client.close();
+    }
+
+    @Test
+    public void testRegexSubscription() throws Exception {
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
+            .connectionsPerBroker(5).ioThreads(5).build();
+
+        // create two topics by subscribing to a topic and closing it
+        try (Consumer<byte[]> ignored = client.newConsumer()
+            .topic("persistent://sample/test/local/topic1")
+            .subscriptionName("ignored")
+            .subscribe()) {
+        }
+        try (Consumer<byte[]> ignored = client.newConsumer()
+            .topic("persistent://sample/test/local/topic2")
+            .subscriptionName("ignored")
+            .subscribe()) {
+        }
+
+        // make sure regex subscription
+        String regexSubscriptionPattern = "persistent://sample/test/local/topic.*";
+        log.info("Regex subscribe to topics {}", regexSubscriptionPattern);
+        try (Consumer<byte[]> consumer = client.newConsumer()
+            .topicsPattern(regexSubscriptionPattern)
+            .subscriptionName("regex-sub")
+            .subscribe()) {
+            log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern);
+
+            final int numMessages = 20;
+
+            try (Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                .topic("persistent://sample/test/local/topic1")
+                .create()) {
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(("message-" + i).getBytes(UTF_8));
+                }
+            }
+
+            for (int i = 0; i < numMessages; i++) {
+                Message<byte[]> msg = consumer.receive();
+                assertEquals("message-" + i, new String(msg.getValue(), UTF_8));
+            }
+        }
+    }
+
+    @Test
+    private void testProtocolVersionAdvertisement() throws Exception {
+        final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get();
+        final String topic = "persistent://sample/test/local/protocol-version-advertisement";
+        final String sub = "my-sub";
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl(url);
+        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
+                .subscriptionType(SubscriptionType.Failover).subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test-msg".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            checkNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+            throws Exception {
+        ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+        ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
+            return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+                @Override
+                protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        });
+
+        return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+    }
+
+}