You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/05/03 07:04:27 UTC
[incubator-pulsar] branch master updated: Fixed authentication flow
via Pulsar Proxy (#1707)
This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9348d34 Fixed authentication flow via Pulsar Proxy (#1707)
9348d34 is described below
commit 9348d34e830f558e3fc0ca096115bcc0d569ab0d
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Thu May 3 00:04:24 2018 -0700
Fixed authentication flow via Pulsar Proxy (#1707)
---
.../apache/pulsar/broker/service/ServerCnx.java | 52 +---
.../org/apache/pulsar/client/impl/ClientCnx.java | 30 ++-
.../apache/pulsar/client/impl/ConnectionPool.java | 7 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 +-
.../org/apache/pulsar/common/api/Commands.java | 66 ++---
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
.../pulsar/proxy/server/DirectProxyHandler.java | 29 ++-
.../pulsar/proxy/server/LookupProxyHandler.java | 35 ++-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 63 +++++
.../pulsar/proxy/server/ProxyConnection.java | 95 ++++++--
.../apache/pulsar/proxy/server/ProxyService.java | 57 +----
.../proxy/server/ProxyAuthenticationTest.java | 268 +++++++++++++++++++++
.../proxy/server/ProxyForwardAuthDataTest.java | 2 +-
13 files changed, 502 insertions(+), 210 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4fac97d..f8ad145 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -227,21 +227,6 @@ public class ServerCnx extends PulsarHandler {
return;
}
- String originalPrincipal = null;
- if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
- originalPrincipal = validateOriginalPrincipal(
- lookup.hasOriginalAuthData() ? lookup.getOriginalAuthData() : null,
- lookup.hasOriginalAuthMethod() ? lookup.getOriginalAuthMethod() : null,
- lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal, requestId,
- lookup);
-
- if (originalPrincipal == null) {
- return;
- }
- } else {
- originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
- }
-
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -312,22 +297,7 @@ public class ServerCnx extends PulsarHandler {
if (topicName == null) {
return;
}
- String originalPrincipal = null;
- if (authenticateOriginalAuthData && partitionMetadata.hasOriginalAuthData()) {
- originalPrincipal = validateOriginalPrincipal(
- partitionMetadata.hasOriginalAuthData() ? partitionMetadata.getOriginalAuthData() : null,
- partitionMetadata.hasOriginalAuthMethod() ? partitionMetadata.getOriginalAuthMethod() : null,
- partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal()
- : this.originalPrincipal,
- requestId, partitionMetadata);
-
- if (originalPrincipal == null) {
- return;
- }
- } else {
- originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
- }
-
+
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -450,26 +420,6 @@ public class ServerCnx extends PulsarHandler {
return commandConsumerStatsResponseBuilder;
}
- private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
- ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
- SSLSession sslSession = null;
- if (sslHandler != null) {
- sslSession = ((SslHandler) sslHandler).engine().getSession();
- }
- try {
- return getOriginalPrincipal(originalAuthData, originalAuthMethod, originalPrincipal, sslSession);
- } catch (AuthenticationException e) {
- String msg = "Unable to authenticate original authdata ";
- log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
- if (request instanceof CommandLookupTopic) {
- ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthenticationError, msg, requestId));
- } else if (request instanceof CommandPartitionedTopicMetadata) {
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthenticationError, msg, requestId));
- }
- return null;
- }
- }
-
private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
SSLSession sslSession) throws AuthenticationException {
if (authenticateOriginalAuthData) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 038e6ae..6a8ab74 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@@ -36,7 +37,6 @@ import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -73,7 +73,7 @@ import org.slf4j.LoggerFactory;
public class ClientCnx extends PulsarHandler {
- private final Authentication authentication;
+ protected final Authentication authentication;
private State state;
private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> pendingRequests =
@@ -102,7 +102,7 @@ public class ClientCnx extends PulsarHandler {
private final int rejectedRequestResetTimeSec = 60;
private final long operationTimeoutMs;
- private String proxyToTargetBrokerAddress = null;
+ protected String proxyToTargetBrokerAddress = null;
// Remote hostName with which client is connected
private String remoteHostName = null;
private boolean isTlsHostnameVerificationEnable;
@@ -130,7 +130,6 @@ public class ClientCnx extends PulsarHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
-
if (proxyToTargetBrokerAddress == null) {
if (log.isDebugEnabled()) {
log.debug("{} Connected to broker", ctx.channel());
@@ -138,13 +137,8 @@ public class ClientCnx extends PulsarHandler {
} else {
log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
}
- String authData = "";
- if (authentication.getAuthData().hasDataFromCommand()) {
- authData = authentication.getAuthData().getCommandData();
- }
// Send CONNECT command
- ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData,
- getPulsarClientVersion(), proxyToTargetBrokerAddress))
+ ctx.writeAndFlush(newConnectCommand())
.addListener(future -> {
if (future.isSuccess()) {
if (log.isDebugEnabled()) {
@@ -157,6 +151,15 @@ public class ClientCnx extends PulsarHandler {
}
});
}
+
+ protected ByteBuf newConnectCommand() throws PulsarClientException {
+ String authData = "";
+ if (authentication.getAuthData().hasDataFromCommand()) {
+ authData = authentication.getAuthData().getCommandData();
+ }
+ return Commands.newConnect(authentication.getAuthMethodName(), authData,
+ getPulsarClientVersion(), proxyToTargetBrokerAddress);
+ }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@@ -757,6 +760,13 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException(errorMsg);
}
}
+
+ @VisibleForTesting
+ public void close() {
+ if (ctx != null) {
+ ctx.close();
+ }
+ }
private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 52d85ca..9b08169 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -29,6 +29,7 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -69,6 +70,10 @@ public class ConnectionPool implements Closeable {
public static final String TLS_HANDLER = "tls";
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+ this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
+ }
+
+ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -99,7 +104,7 @@ public class ConnectionPool implements Closeable {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
- ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
+ ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
});
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a446dd4..38950b6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,6 +25,7 @@ import static java.lang.String.format;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import io.netty.buffer.ByteBuf;
@@ -1349,8 +1350,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}
-
- ClientCnx getClientCnx() {
+
+ @VisibleForTesting
+ public ClientCnx getClientCnx() {
return this.connectionHandler.getClientCnx();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 1228876..4f971b6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -516,7 +516,15 @@ public class Commands {
}
public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
- return Commands.newPartitionMetadataRequest(topic, requestId, null, null, null);
+ CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
+ partitionMetadataBuilder.setTopic(topic);
+ partitionMetadataBuilder.setRequestId(requestId);
+ CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
+ ByteBuf res = serializeWithSize(
+ BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
+ partitionMetadataBuilder.recycle();
+ partitionMetadata.recycle();
+ return res;
}
public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -535,7 +543,15 @@ public class Commands {
}
public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
- return Commands.newLookup(topic, authoritative, null, null, null, requestId);
+ CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
+ lookupTopicBuilder.setTopic(topic);
+ lookupTopicBuilder.setRequestId(requestId);
+ lookupTopicBuilder.setAuthoritative(authoritative);
+ CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
+ ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
+ lookupTopicBuilder.recycle();
+ lookupBroker.recycle();
+ return res;
}
public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -1033,52 +1049,6 @@ public class Commands {
None;
}
- public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String originalAuthRole,
- String originalAuthData, String originalAuthMethod) {
- CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
- partitionMetadataBuilder.setTopic(topic);
- partitionMetadataBuilder.setRequestId(requestId);
- if (originalAuthRole != null) {
- partitionMetadataBuilder.setOriginalPrincipal(originalAuthRole);
- }
- if (originalAuthData != null) {
- partitionMetadataBuilder.setOriginalAuthData(originalAuthData);
- }
-
- if (originalAuthMethod != null) {
- partitionMetadataBuilder.setOriginalAuthMethod(originalAuthMethod);
- }
- CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
- ByteBuf res = serializeWithSize(
- BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
- partitionMetadataBuilder.recycle();
- partitionMetadata.recycle();
- return res;
- }
-
- public static ByteBuf newLookup(String topic, boolean authoritative, String originalAuthRole,
- String originalAuthData, String originalAuthMethod, long requestId) {
- CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
- lookupTopicBuilder.setTopic(topic);
- lookupTopicBuilder.setRequestId(requestId);
- lookupTopicBuilder.setAuthoritative(authoritative);
- if (originalAuthRole != null) {
- lookupTopicBuilder.setOriginalPrincipal(originalAuthRole);
- }
- if (originalAuthData != null) {
- lookupTopicBuilder.setOriginalAuthData(originalAuthData);
- }
-
- if (originalAuthMethod != null) {
- lookupTopicBuilder.setOriginalAuthMethod(originalAuthMethod);
- }
- CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
- ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
- lookupTopicBuilder.recycle();
- lookupBroker.recycle();
- return res;
- }
-
public static boolean peerSupportsGetLastMessageId(int peerVersion) {
return peerVersion >= ProtocolVersion.v12.getNumber();
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 7e76521..0e55287 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -227,6 +227,7 @@ message CommandSubscribe {
message CommandPartitionedTopicMetadata {
required string topic = 1;
required uint64 request_id = 2;
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 3;
@@ -254,6 +255,7 @@ message CommandLookupTopic {
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
+ // TODO - Remove original_principal, original_auth_data, original_auth_method
// Original principal that was verified by
// a Pulsar proxy.
optional string original_principal = 4;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 8b224f6..5b3814d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -63,7 +63,7 @@ public class DirectProxyHandler {
private final Authentication authentication;
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
- this.authentication = service.getClientAuthentication();
+ this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
this.originalPrincipal = proxyConnection.clientAuthRole;
this.clientAuthData = proxyConnection.clientAuthData;
@@ -72,7 +72,8 @@ public class DirectProxyHandler {
// Start the connection attempt.
Bootstrap b = new Bootstrap();
- // Tie the backend connection on the same thread to avoid context switches when passing data between the 2
+ // Tie the backend connection on the same thread to avoid context
+ // switches when passing data between the 2
// connections
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
@@ -85,8 +86,8 @@ public class DirectProxyHandler {
AuthenticationDataProvider authData = authentication.getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(),
- config.getBrokerClientTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
+ config.getBrokerClientTrustCertsFilePath(),
+ (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(),
config.getBrokerClientTrustCertsFilePath());
@@ -101,7 +102,8 @@ public class DirectProxyHandler {
URI targetBroker;
try {
- // targetBrokerUrl is coming in the "hostname:6650" form, so we need to extract host and port
+ // targetBrokerUrl is coming in the "hostname:6650" form, so we need
+ // to extract host and port
targetBroker = new URI("pulsar://" + targetBrokerUrl);
} catch (URISyntaxException e) {
log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
@@ -117,7 +119,8 @@ public class DirectProxyHandler {
inboundChannel.close();
return;
}
- final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline().get("proxyOutboundHandler");
+ final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
+ .get("proxyOutboundHandler");
cnx.setRemoteHostName(targetBroker.getHost());
});
}
@@ -132,7 +135,7 @@ public class DirectProxyHandler {
private String remoteHostName;
protected ChannelHandlerContext ctx;
private ProxyConfiguration config;
-
+
public ProxyBackendHandler(ProxyConfiguration config) {
this.config = config;
}
@@ -177,7 +180,8 @@ public class DirectProxyHandler {
@Override
public void operationComplete(Future<Void> future) throws Exception {
- // This is invoked when the write operation on the paired connection is completed
+ // This is invoked when the write operation on the paired connection
+ // is completed
if (future.isSuccess()) {
outboundChannel.read();
} else {
@@ -197,15 +201,16 @@ public class DirectProxyHandler {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
}
-
+
if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null
&& !verifyTlsHostName(remoteHostName, ctx)) {
- // close the connection if host-verification failed with the broker
+ // close the connection if host-verification failed with the
+ // broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}
-
+
state = BackendState.HandshakeCompleted;
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> {
@@ -231,7 +236,7 @@ public class DirectProxyHandler {
log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause);
ctx.close();
}
-
+
public void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index bbf1c44..bed7ed7 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
@@ -35,7 +37,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Counter;
-import static org.apache.commons.lang3.StringUtils.isBlank;
public class LookupProxyHandler {
private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
@@ -131,26 +132,24 @@ public class LookupProxyHandler {
log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, topic,
clientRequestId);
}
- service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+ proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
- long requestId = service.newRequestId();
+ long requestId = proxyConnection.newRequestId();
ByteBuf command;
- if (service.getConfiguration().isAuthenticationEnabled()) {
- command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole,
- proxyConnection.clientAuthData, proxyConnection.clientAuthMethod, requestId);
- } else {
- command = Commands.newLookup(topic, authoritative, requestId);
- }
+ command = Commands.newLookup(topic, authoritative, requestId);
clientCnx.newLookup(command, requestId).thenAccept(result -> {
String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
if (result.redirect) {
// Need to try the lookup again on a different broker
performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
} else {
- // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
- // and broker is independent of whether the client itself uses TLS, but we need to force the
+ // Reply the same address for both TLS non-TLS. The reason
+ // is that whether we use TLS
+ // and broker is independent of whether the client itself
+ // uses TLS, but we need to force the
// client
- // to use the appropriate target broker (and port) when it will connect back.
+ // to use the appropriate target broker (and port) when it
+ // will connect back.
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
}
@@ -223,17 +222,11 @@ public class LookupProxyHandler {
topicName.getPartitionedTopicName(), clientRequestId);
}
- service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+ proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
- long requestId = service.newRequestId();
+ long requestId = proxyConnection.newRequestId();
ByteBuf command;
- if (service.getConfiguration().isAuthenticationEnabled()) {
- command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
- proxyConnection.clientAuthRole, proxyConnection.clientAuthData,
- proxyConnection.clientAuthMethod);
- } else {
- command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
- }
+ command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
proxyConnection.ctx().writeAndFlush(
Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
new file mode 100644
index 0000000..9eb1fe7
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+
+public class ProxyClientCnx extends ClientCnx {
+
+ String clientAuthRole;
+ String clientAuthData;
+ String clientAuthMethod;
+
+ public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
+ String clientAuthData, String clientAuthMethod) {
+ super(conf, eventLoopGroup);
+ this.clientAuthRole = clientAuthRole;
+ this.clientAuthData = clientAuthData;
+ this.clientAuthMethod = clientAuthMethod;
+ }
+
+ @Override
+ protected ByteBuf newConnectCommand() throws PulsarClientException {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
+ clientAuthRole, clientAuthData, clientAuthMethod);
+ }
+ String authData = null;
+ if (authentication.getAuthData().hasDataFromCommand()) {
+ authData = authentication.getAuthData().getCommandData();
+ }
+ return Commands.newConnect(authentication.getAuthMethodName(), authData,
+ getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index eaa006d..8e9effd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,12 +23,18 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
-import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -52,16 +58,18 @@ import io.prometheus.client.Gauge;
*
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
-
+ // ConnectionPool is used by the proxy to issue lookup requests
+ private PulsarClientImpl client;
private ProxyService service;
- String clientAuthRole = null;
- String clientAuthData = null;
- String clientAuthMethod = null;
+ private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
private LookupProxyHandler lookupProxyHandler = null;
private DirectProxyHandler directProxyHandler = null;
+ String clientAuthRole;
+ String clientAuthData;
+ String clientAuthMethod;
enum State {
Init,
@@ -78,6 +86,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
Closed,
}
+ ConnectionPool getConnectionPool() {
+ return client.getCnxPool();
+ }
+
private static final Gauge activeConnections = Gauge
.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
.register();
@@ -149,7 +161,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
break;
case ProxyConnectionToBroker:
- // Pass the buffer to the outbound connection and schedule next read only
+ // Pass the buffer to the outbound connection and schedule next read
+ // only
// if we can write on the connection
directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
break;
@@ -161,7 +174,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
@Override
public void operationComplete(Future<Void> future) throws Exception {
- // This is invoked when the write operation on the paired connection is completed
+ // This is invoked when the write operation on the paired connection is
+ // completed
if (future.isSuccess()) {
ctx.read();
} else {
@@ -189,20 +203,22 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return;
}
- if (!verifyAuthenticationIfNeeded(connect)) {
+ if (!authenticateAndCreateClient(connect)) {
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
if (connect.hasProxyToBrokerUrl()) {
- // Client already knows which broker to connect. Let's open a connection
+ // Client already knows which broker to connect. Let's open a
+ // connection
// there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
cancelKeepAliveTask();
} else {
- // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and
+ // Client is doing a lookup, we can consider the handshake complete
+ // and we'll take care of just topics and
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
@@ -229,14 +245,39 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private void close() {
state = State.Closed;
ctx.close();
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+ }
}
- private boolean verifyAuthenticationIfNeeded(CommandConnect connect) {
- if (!service.getConfiguration().isAuthenticationEnabled()) {
- return true;
+ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ clientConf.setServiceUrl(service.getServiceUrl());
+ ProxyConfiguration proxyConfig = service.getConfiguration();
+ if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
+ clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters()));
}
+ if (proxyConfig.isTlsEnabledWithBroker()) {
+ clientConf.setUseTls(true);
+ clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+ clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+ }
+ return clientConf;
+ }
+ private boolean authenticateAndCreateClient(CommandConnect connect) {
try {
+ ClientConfigurationData clientConf = createClientConfiguration();
+ this.clientAuthentication = clientConf.getAuthentication();
+
+ if (!service.getConfiguration().isAuthenticationEnabled()) {
+ this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup());
+ return true;
+ }
+
String authMethod = "none";
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
@@ -245,11 +286,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
}
String authData = connect.getAuthData().toStringUtf8();
-
- if (service.getConfiguration().forwardAuthorizationCredentials()) {
- clientAuthData = authData;
- clientAuthMethod = authMethod;
- }
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
SSLSession sslSession = null;
if (sslHandler != null) {
@@ -259,13 +295,34 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod);
LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
clientAuthRole);
+ if (service.getConfiguration().forwardAuthorizationCredentials()) {
+ this.clientAuthData = authData;
+ this.clientAuthMethod = authMethod;
+ }
+ this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod);
+
return true;
- } catch (AuthenticationException e) {
+ } catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: {}", remoteAddress, e.getMessage());
return false;
}
}
+ private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData,
+ final String clientAuthMethod) throws PulsarClientException {
+ return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
+ new ConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
+ service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod)));
+ }
+
+ long newRequestId() {
+ return client.newRequestId();
+ }
+
+ public Authentication getClientAuthentication() {
+ return clientAuthentication;
+ }
+
@Override
protected boolean isHandshakeCompleted() {
return state != State.Init;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 48293be..28af1ab 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,13 +21,6 @@ package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
@@ -39,11 +32,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -51,6 +39,13 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
/**
* Pulsar proxy service
*/
@@ -66,14 +61,10 @@ public class ProxyService implements Closeable {
private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
+
private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
- // ConnectionPool is used by the proxy to issue lookup requests
- private final PulsarClientImpl client;
-
- private final Authentication clientAuthentication;
-
private BrokerDiscoveryProvider discoveryProvider;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@@ -98,21 +89,6 @@ public class ProxyService implements Closeable {
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
-
- ClientConfigurationData clientConf = new ClientConfigurationData();
- clientConf.setServiceUrl(serviceUrl);
- if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
- clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
- proxyConfig.getBrokerClientAuthenticationParameters()));
- }
- if (proxyConfig.isTlsEnabledWithBroker()) {
- clientConf.setUseTls(true);
- clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
- clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
- }
-
- this.client = new PulsarClientImpl(clientConf, workerGroup);
- this.clientAuthentication = clientConf.getAuthentication();
}
public void start() throws Exception {
@@ -148,14 +124,6 @@ public class ProxyService implements Closeable {
}
}
- long newRequestId() {
- return client.newRequestId();
- }
-
- ConnectionPool getConnectionPool() {
- return client.getCnxPool();
- }
-
public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
@@ -174,7 +142,6 @@ public class ProxyService implements Closeable {
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- client.close();
}
public String getServiceUrl() {
@@ -197,10 +164,6 @@ public class ProxyService implements Closeable {
return authorizationService;
}
- public Authentication getClientAuthentication() {
- return clientAuthentication;
- }
-
public ConfigurationCacheService getConfigurationCacheService() {
return configurationCacheService;
}
@@ -212,6 +175,10 @@ public class ProxyService implements Closeable {
public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}
+
+ public EventLoopGroup getWorkerGroup() {
+ return workerGroup;
+ }
private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
new file mode 100644
index 0000000..f572e5d
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class ProxyAuthenticationTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+ private int webServicePort;
+ private int servicePort;
+
+ public static class BasicAuthenticationData implements AuthenticationDataProvider {
+ private String authParam;
+
+ public BasicAuthenticationData(String authParam) {
+ this.authParam = authParam;
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return authParam;
+ }
+
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set<Entry<String, String>> getHttpHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("BasicAuthentication", authParam);
+ return headers.entrySet();
+ }
+ }
+
+ public static class BasicAuthentication implements Authentication {
+
+ private String authParam;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ try {
+ return new BasicAuthenticationData(authParam);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}",
+ authParams.get("entityType"), authParams.get("expiryTime"));
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+ }
+
+ public static class BasicAuthenticationProvider implements AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ String commandData = null;
+ if (authData.hasDataFromCommand()) {
+ commandData = authData.getCommandData();
+ } else if (authData.hasDataFromHttp()) {
+ commandData = authData.getHttpHeader("BasicAuthentication");
+ }
+
+ JsonParser parser = new JsonParser();
+ JsonObject element = parser.parse(commandData).getAsJsonObject();
+ long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
+ long currentTimeInMillis = System.currentTimeMillis();
+ if (expiryTimeInMillis < currentTimeInMillis) {
+ throw new AuthenticationException("Authentication data has been expired");
+ }
+ return element.get("entityType").getAsString();
+ }
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ webServicePort = PortManager.nextFreePort();
+ servicePort = PortManager.nextFreePort();
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+ conf.setTlsEnabled(false);
+ conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ // Expires after an hour
+ conf.setBrokerClientAuthenticationParameters(
+ "entityType:broker,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
+
+ Set<String> superUserRoles = new HashSet<String>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<String>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("test");
+ Set<String> proxyRoles = new HashSet<String>();
+ proxyRoles.add("proxy");
+ conf.setProxyRoles(proxyRoles);
+ conf.setAuthenticateOriginalAuthData(true);
+ super.init();
+
+ updateAdminClient();
+ producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testAuthentication() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ updateAdminClient();
+ final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+ // create a client which connects to proxy and pass authData
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+ // expires after 6 seconds
+ String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000);
+ // expires after 3 seconds
+ String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Step 2: Try to use proxy Client as a normal Client - expect exception
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(servicePort);
+ proxyConfig.setWebServicePort(webServicePort);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+
+ proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(true);
+ ProxyService proxyService = new ProxyService(proxyConfig);
+
+ proxyService.start();
+
+ // Step 3: Pass correct client params
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
+ proxyClient.newProducer().topic(topicName).create();
+ // Sleep for 4 seconds - wait for proxy auth params to expire
+ Thread.sleep(4 * 1000);
+ proxyClient.newProducer().topic(topicName).create();
+ // Sleep for 3 seconds - wait for client auth parans to expire
+ Thread.sleep(3 * 1000);
+ proxyClient.newProducer().topic(topicName).create();
+ proxyClient.close();
+ proxyService.close();
+ }
+
+ private void updateAdminClient() throws PulsarClientException {
+ // Expires after an hour
+ String adminAuthParams = "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams, int numberOfConnections) throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+ .authentication(BasicAuthentication.class.getName(), authParams).connectionsPerBroker(numberOfConnections).build();
+ }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index bdd2e86..de5bba1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -119,7 +119,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
proxyService.start();
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Assert.fail("Shouldn't be able to subscribe, auth required");
- } catch (PulsarClientException.AuthorizationException e) {
+ } catch (Exception e) {
// expected behaviour
}
--
To stop receiving notification emails like this one, please contact
jai1@apache.org.