You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/26 06:34:57 UTC

[pulsar] branch master updated: Make proxy advertise protocol version of client to broker (#2845)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bdcdc5  Make proxy advertise protocol version of client to broker (#2845)
5bdcdc5 is described below

commit 5bdcdc584834714b7c660cbad84d748ec7b98fee
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Fri Oct 26 15:34:52 2018 +0900

    Make proxy advertise protocol version of client to broker (#2845)
    
    * Make proxy advertise protocol version of client to broker
    
    * Revert incorrect change
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 10 +++-
 .../org/apache/pulsar/common/api/Commands.java     |  1 -
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++--
 .../apache/pulsar/proxy/server/ProxyClientCnx.java | 40 ++++++++-------
 .../pulsar/proxy/server/ProxyConnection.java       | 28 ++++++++---
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 58 ++++++++++++++++++++++
 6 files changed, 117 insertions(+), 33 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9306a82..ccc20b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler {
     private volatile int numberOfRejectRequests = 0;
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
+    private final int protocolVersion;
     private final long operationTimeoutMs;
 
     protected String proxyToTargetBrokerAddress = null;
@@ -123,6 +124,10 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+    }
+
+    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
         super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
         checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
         this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
@@ -135,6 +140,7 @@ public class ClientCnx extends PulsarHandler {
         this.state = State.None;
         this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
         this.hostnameVerifier = new DefaultHostnameVerifier();
+        this.protocolVersion = protocolVersion;
     }
 
     @Override
@@ -167,8 +173,8 @@ public class ClientCnx extends PulsarHandler {
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), authData,
-                getPulsarClientVersion(), proxyToTargetBrokerAddress);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
 
     @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 16e47a0..31dcac1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -1116,7 +1116,6 @@ public class Commands {
         return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
     }
 
-    @VisibleForTesting
     public static int getCurrentProtocolVersion() {
         // Return the last ProtocolVersion enum value
         return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 232db43..c7fa786 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -59,16 +59,19 @@ public class DirectProxyHandler {
     private String originalPrincipal;
     private String clientAuthData;
     private String clientAuthMethod;
+    private int protocolVersion;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
+            int protocolVersion) {
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
+        this.protocolVersion = protocolVersion;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -97,7 +100,7 @@ public class DirectProxyHandler {
                 }
                 ch.pipeline().addLast("frameDecoder",
                         new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config));
+                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
             }
         });
 
@@ -136,9 +139,11 @@ public class DirectProxyHandler {
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
         private ProxyConfiguration config;
+        private int protocolVersion;
 
-        public ProxyBackendHandler(ProxyConfiguration config) {
+        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
             this.config = config;
+            this.protocolVersion = protocolVersion;
         }
 
         @Override
@@ -150,7 +155,7 @@ public class DirectProxyHandler {
                 authData = authentication.getAuthData().getCommandData();
             }
             ByteBuf command = null;
-            command = Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy",
+            command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
             outboundChannel.read();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 9eb1fe7..a075840 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -32,32 +32,34 @@ import io.netty.channel.EventLoopGroup;
 
 public class ProxyClientCnx extends ClientCnx {
 
-	String clientAuthRole;
-	String clientAuthData;
-	String clientAuthMethod;
-	
-	public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-			String clientAuthData, String clientAuthMethod) {
-		super(conf, eventLoopGroup);
-		this.clientAuthRole = clientAuthRole;
-		this.clientAuthData = clientAuthData;
-		this.clientAuthMethod = clientAuthMethod;
-	}
-    
-	@Override
-	protected ByteBuf newConnectCommand() throws PulsarClientException {
-	    if (log.isDebugEnabled()) {
+    String clientAuthRole;
+    String clientAuthData;
+    String clientAuthMethod;
+    int protocolVersion;
+
+    public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
+            String clientAuthData, String clientAuthMethod, int protocolVersion) {
+        super(conf, eventLoopGroup);
+        this.clientAuthRole = clientAuthRole;
+        this.clientAuthData = clientAuthData;
+        this.clientAuthMethod = clientAuthMethod;
+        this.protocolVersion = protocolVersion;
+    }
+
+    @Override
+    protected ByteBuf newConnectCommand() throws PulsarClientException {
+        if (log.isDebugEnabled()) {
             log.debug(
                     "New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
-	    }
-	    String authData = null;
+        }
+        String authData = null;
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), authData,
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
                 getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod);
     }
-	
+
     private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2ebea8a..d1aa9dd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -202,6 +202,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             return;
         }
 
+        int protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
+                    remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion,
+                    Commands.getCurrentProtocolVersion());
+        }
+
         if (!authenticateAndCreateClient(connect)) {
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
@@ -213,7 +221,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             // connection
             // there and just pass bytes in both directions
             state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
+            directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl(),
+                    protocolVersionToAdvertise);
             cancelKeepAliveTask();
         } else {
             // Client is doing a lookup, we can consider the handshake complete
@@ -221,7 +230,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
         }
     }
 
@@ -279,10 +288,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             ClientConfigurationData clientConf = createClientConfiguration();
             this.clientAuthentication = clientConf.getAuthentication();
 
+            final int protocolVersion = getProtocolVersionToAdvertise(connect);
             if (!service.getConfiguration().isAuthenticationEnabled()) {
                 this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                        new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ClientCnx(clientConf,
-                                service.getWorkerGroup())));
+                        new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                                () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)));
                 return true;
             }
 
@@ -307,7 +317,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 this.clientAuthData = authData;
                 this.clientAuthMethod = authMethod;
             }
-            this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod);
+            this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersion);
 
             return true;
         } catch (Exception e) {
@@ -317,10 +327,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData,
-            final String clientAuthMethod) throws PulsarClientException {
+            final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod)));
+                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)));
+    }
+
+    private static int getProtocolVersionToAdvertise(CommandConnect connect) {
+        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 99d79fb..09fdbdb 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,8 +24,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -35,8 +39,16 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,4 +205,50 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    private void testProtocolVersionAdvertisement() throws Exception {
+        final String url = "pulsar://localhost:" + proxyConfig.getServicePort();
+        final String topic = "persistent://sample/test/local/protocol-version-advertisement";
+        final String sub = "my-sub";
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl(url);
+        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
+                .subscriptionType(SubscriptionType.Failover).subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test-msg".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            checkNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+            throws Exception {
+        ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+        ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
+            return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+                @Override
+                protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        });
+
+        return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+    }
+
 }