You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/26 06:34:54 UTC

[GitHub] merlimat closed pull request #2845: Make proxy advertise protocol version of client to broker

merlimat closed pull request #2845: Make proxy advertise protocol version of client to broker
URL: https://github.com/apache/pulsar/pull/2845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9306a8248b..ccc20b3ec3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,6 +110,7 @@
     private volatile int numberOfRejectRequests = 0;
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
+    private final int protocolVersion;
     private final long operationTimeoutMs;
 
     protected String proxyToTargetBrokerAddress = null;
@@ -123,6 +124,10 @@
     }
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+    }
+
+    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
         super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
         checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
         this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
@@ -135,6 +140,7 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this.state = State.None;
         this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
         this.hostnameVerifier = new DefaultHostnameVerifier();
+        this.protocolVersion = protocolVersion;
     }
 
     @Override
@@ -167,8 +173,8 @@ protected ByteBuf newConnectCommand() throws PulsarClientException {
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), authData,
-                getPulsarClientVersion(), proxyToTargetBrokerAddress);
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
 
     @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 16e47a0148..31dcac190e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -1116,7 +1116,6 @@ private static ByteBufPair serializeCommandMessageWithSize(BaseCommand cmd, Byte
         return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
     }
 
-    @VisibleForTesting
     public static int getCurrentProtocolVersion() {
         // Return the last ProtocolVersion enum value
         return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 232db4307e..c7fa786123 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -59,16 +59,19 @@
     private String originalPrincipal;
     private String clientAuthData;
     private String clientAuthMethod;
+    private int protocolVersion;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
+            int protocolVersion) {
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
+        this.protocolVersion = protocolVersion;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -97,7 +100,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
                 }
                 ch.pipeline().addLast("frameDecoder",
                         new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config));
+                ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
             }
         });
 
@@ -136,9 +139,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
         private ProxyConfiguration config;
+        private int protocolVersion;
 
-        public ProxyBackendHandler(ProxyConfiguration config) {
+        public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
             this.config = config;
+            this.protocolVersion = protocolVersion;
         }
 
         @Override
@@ -150,7 +155,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
                 authData = authentication.getAuthData().getCommandData();
             }
             ByteBuf command = null;
-            command = Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy",
+            command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
             outboundChannel.read();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 9eb1fe75fc..a075840311 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -32,32 +32,34 @@
 
 public class ProxyClientCnx extends ClientCnx {
 
-	String clientAuthRole;
-	String clientAuthData;
-	String clientAuthMethod;
-	
-	public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
-			String clientAuthData, String clientAuthMethod) {
-		super(conf, eventLoopGroup);
-		this.clientAuthRole = clientAuthRole;
-		this.clientAuthData = clientAuthData;
-		this.clientAuthMethod = clientAuthMethod;
-	}
-    
-	@Override
-	protected ByteBuf newConnectCommand() throws PulsarClientException {
-	    if (log.isDebugEnabled()) {
+    String clientAuthRole;
+    String clientAuthData;
+    String clientAuthMethod;
+    int protocolVersion;
+
+    public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
+            String clientAuthData, String clientAuthMethod, int protocolVersion) {
+        super(conf, eventLoopGroup);
+        this.clientAuthRole = clientAuthRole;
+        this.clientAuthData = clientAuthData;
+        this.clientAuthMethod = clientAuthMethod;
+        this.protocolVersion = protocolVersion;
+    }
+
+    @Override
+    protected ByteBuf newConnectCommand() throws PulsarClientException {
+        if (log.isDebugEnabled()) {
             log.debug(
                     "New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
-	    }
-	    String authData = null;
+        }
+        String authData = null;
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), authData,
+        return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
                 getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod);
     }
-	
+
     private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2ebea8a774..d1aa9dd0da 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -202,6 +202,14 @@ protected void handleConnect(CommandConnect connect) {
             return;
         }
 
+        int protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
+                    remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion,
+                    Commands.getCurrentProtocolVersion());
+        }
+
         if (!authenticateAndCreateClient(connect)) {
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
@@ -213,7 +221,8 @@ protected void handleConnect(CommandConnect connect) {
             // connection
             // there and just pass bytes in both directions
             state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
+            directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl(),
+                    protocolVersionToAdvertise);
             cancelKeepAliveTask();
         } else {
             // Client is doing a lookup, we can consider the handshake complete
@@ -221,7 +230,7 @@ protected void handleConnect(CommandConnect connect) {
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
         }
     }
 
@@ -279,10 +288,11 @@ private boolean authenticateAndCreateClient(CommandConnect connect) {
             ClientConfigurationData clientConf = createClientConfiguration();
             this.clientAuthentication = clientConf.getAuthentication();
 
+            final int protocolVersion = getProtocolVersionToAdvertise(connect);
             if (!service.getConfiguration().isAuthenticationEnabled()) {
                 this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                        new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ClientCnx(clientConf,
-                                service.getWorkerGroup())));
+                        new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                                () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)));
                 return true;
             }
 
@@ -307,7 +317,7 @@ private boolean authenticateAndCreateClient(CommandConnect connect) {
                 this.clientAuthData = authData;
                 this.clientAuthMethod = authMethod;
             }
-            this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod);
+            this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersion);
 
             return true;
         } catch (Exception e) {
@@ -317,10 +327,14 @@ private boolean authenticateAndCreateClient(CommandConnect connect) {
     }
 
     private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData,
-            final String clientAuthMethod) throws PulsarClientException {
+            final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod)));
+                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)));
+    }
+
+    private static int getProtocolVersionToAdvertise(CommandConnect connect) {
+        return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 99d79fb639..09fdbdb159 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,8 +24,12 @@
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -35,8 +39,16 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,4 +205,50 @@ public void testRegexSubscription() throws Exception {
         }
     }
 
+    @Test
+    private void testProtocolVersionAdvertisement() throws Exception {
+        final String url = "pulsar://localhost:" + proxyConfig.getServicePort();
+        final String topic = "persistent://sample/test/local/protocol-version-advertisement";
+        final String sub = "my-sub";
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl(url);
+        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
+                .subscriptionType(SubscriptionType.Failover).subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test-msg".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            checkNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+            throws Exception {
+        ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+        ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
+            return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+                @Override
+                protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        });
+
+        return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services