You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/26 06:34:57 UTC
[pulsar] branch master updated: Make proxy advertise protocol
version of client to broker (#2845)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5bdcdc5 Make proxy advertise protocol version of client to broker (#2845)
5bdcdc5 is described below
commit 5bdcdc584834714b7c660cbad84d748ec7b98fee
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Fri Oct 26 15:34:52 2018 +0900
Make proxy advertise protocol version of client to broker (#2845)
* Make proxy advertise protocol version of client to broker
* Revert incorrect change
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 10 +++-
.../org/apache/pulsar/common/api/Commands.java | 1 -
.../pulsar/proxy/server/DirectProxyHandler.java | 13 +++--
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 40 ++++++++-------
.../pulsar/proxy/server/ProxyConnection.java | 28 ++++++++---
.../org/apache/pulsar/proxy/server/ProxyTest.java | 58 ++++++++++++++++++++++
6 files changed, 117 insertions(+), 33 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9306a82..ccc20b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler {
private volatile int numberOfRejectRequests = 0;
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
+ private final int protocolVersion;
private final long operationTimeoutMs;
protected String proxyToTargetBrokerAddress = null;
@@ -123,6 +124,10 @@ public class ClientCnx extends PulsarHandler {
}
public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+ this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+ }
+
+ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
@@ -135,6 +140,7 @@ public class ClientCnx extends PulsarHandler {
this.state = State.None;
this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
this.hostnameVerifier = new DefaultHostnameVerifier();
+ this.protocolVersion = protocolVersion;
}
@Override
@@ -167,8 +173,8 @@ public class ClientCnx extends PulsarHandler {
if (authentication.getAuthData().hasDataFromCommand()) {
authData = authentication.getAuthData().getCommandData();
}
- return Commands.newConnect(authentication.getAuthMethodName(), authData,
- getPulsarClientVersion(), proxyToTargetBrokerAddress);
+ return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
+ getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
}
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 16e47a0..31dcac1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -1116,7 +1116,6 @@ public class Commands {
return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
}
- @VisibleForTesting
public static int getCurrentProtocolVersion() {
// Return the last ProtocolVersion enum value
return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 232db43..c7fa786 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -59,16 +59,19 @@ public class DirectProxyHandler {
private String originalPrincipal;
private String clientAuthData;
private String clientAuthMethod;
+ private int protocolVersion;
public static final String TLS_HANDLER = "tls";
private final Authentication authentication;
- public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
+ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
+ int protocolVersion) {
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
this.originalPrincipal = proxyConnection.clientAuthRole;
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
+ this.protocolVersion = protocolVersion;
ProxyConfiguration config = service.getConfiguration();
// Start the connection attempt.
@@ -97,7 +100,7 @@ public class DirectProxyHandler {
}
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
- ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config));
+ ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
}
});
@@ -136,9 +139,11 @@ public class DirectProxyHandler {
private String remoteHostName;
protected ChannelHandlerContext ctx;
private ProxyConfiguration config;
+ private int protocolVersion;
- public ProxyBackendHandler(ProxyConfiguration config) {
+ public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
this.config = config;
+ this.protocolVersion = protocolVersion;
}
@Override
@@ -150,7 +155,7 @@ public class DirectProxyHandler {
authData = authentication.getAuthData().getCommandData();
}
ByteBuf command = null;
- command = Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy",
+ command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
outboundChannel.writeAndFlush(command);
outboundChannel.read();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 9eb1fe7..a075840 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -32,32 +32,34 @@ import io.netty.channel.EventLoopGroup;
public class ProxyClientCnx extends ClientCnx {
- String clientAuthRole;
- String clientAuthData;
- String clientAuthMethod;
-
- public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
- String clientAuthData, String clientAuthMethod) {
- super(conf, eventLoopGroup);
- this.clientAuthRole = clientAuthRole;
- this.clientAuthData = clientAuthData;
- this.clientAuthMethod = clientAuthMethod;
- }
-
- @Override
- protected ByteBuf newConnectCommand() throws PulsarClientException {
- if (log.isDebugEnabled()) {
+ String clientAuthRole;
+ String clientAuthData;
+ String clientAuthMethod;
+ int protocolVersion;
+
+ public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
+ String clientAuthData, String clientAuthMethod, int protocolVersion) {
+ super(conf, eventLoopGroup);
+ this.clientAuthRole = clientAuthRole;
+ this.clientAuthData = clientAuthData;
+ this.clientAuthMethod = clientAuthMethod;
+ this.protocolVersion = protocolVersion;
+ }
+
+ @Override
+ protected ByteBuf newConnectCommand() throws PulsarClientException {
+ if (log.isDebugEnabled()) {
log.debug(
"New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
clientAuthRole, clientAuthData, clientAuthMethod);
- }
- String authData = null;
+ }
+ String authData = null;
if (authentication.getAuthData().hasDataFromCommand()) {
authData = authentication.getAuthData().getCommandData();
}
- return Commands.newConnect(authentication.getAuthMethodName(), authData,
+ return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod);
}
-
+
private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2ebea8a..d1aa9dd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -202,6 +202,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return;
}
+ int protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}",
+ remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion,
+ Commands.getCurrentProtocolVersion());
+ }
+
if (!authenticateAndCreateClient(connect)) {
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
@@ -213,7 +221,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// connection
// there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
- directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
+ directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl(),
+ protocolVersionToAdvertise);
cancelKeepAliveTask();
} else {
// Client is doing a lookup, we can consider the handshake complete
@@ -221,7 +230,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
- ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+ ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
}
}
@@ -279,10 +288,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
ClientConfigurationData clientConf = createClientConfiguration();
this.clientAuthentication = clientConf.getAuthentication();
+ final int protocolVersion = getProtocolVersionToAdvertise(connect);
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ClientCnx(clientConf,
- service.getWorkerGroup())));
+ new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)));
return true;
}
@@ -307,7 +317,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
this.clientAuthData = authData;
this.clientAuthMethod = authMethod;
}
- this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod);
+ this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersion);
return true;
} catch (Exception e) {
@@ -317,10 +327,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData,
- final String clientAuthMethod) throws PulsarClientException {
+ final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
- service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod)));
+ service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)));
+ }
+
+ private static int getProtocolVersionToAdvertise(CommandConnect connect) {
+ return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}
long newRequestId() {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 99d79fb..09fdbdb 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,8 +24,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -35,8 +39,16 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -193,4 +205,50 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ private void testProtocolVersionAdvertisement() throws Exception {
+ final String url = "pulsar://localhost:" + proxyConfig.getServicePort();
+ final String topic = "persistent://sample/test/local/protocol-version-advertisement";
+ final String sub = "my-sub";
+
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl(url);
+ PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+ Producer<byte[]> producer = client.newProducer().topic(topic).create();
+ Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub)
+ .subscriptionType(SubscriptionType.Failover).subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send("test-msg".getBytes());
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+ checkNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+ private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+ throws Exception {
+ ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+ ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
+ return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+ @Override
+ protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ });
+
+ return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+ }
+
}