You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/05/03 07:04:27 UTC

[incubator-pulsar] branch master updated: Fixed authentication flow via Pulsar Proxy (#1707)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9348d34  Fixed authentication flow via Pulsar Proxy (#1707)
9348d34 is described below

commit 9348d34e830f558e3fc0ca096115bcc0d569ab0d
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Thu May 3 00:04:24 2018 -0700

    Fixed authentication flow via Pulsar Proxy (#1707)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  52 +---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  30 ++-
 .../apache/pulsar/client/impl/ConnectionPool.java  |   7 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   6 +-
 .../org/apache/pulsar/common/api/Commands.java     |  66 ++---
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +
 .../pulsar/proxy/server/DirectProxyHandler.java    |  29 ++-
 .../pulsar/proxy/server/LookupProxyHandler.java    |  35 ++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  63 +++++
 .../pulsar/proxy/server/ProxyConnection.java       |  95 ++++++--
 .../apache/pulsar/proxy/server/ProxyService.java   |  57 +----
 .../proxy/server/ProxyAuthenticationTest.java      | 268 +++++++++++++++++++++
 .../proxy/server/ProxyForwardAuthDataTest.java     |   2 +-
 13 files changed, 502 insertions(+), 210 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4fac97d..f8ad145 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -227,21 +227,6 @@ public class ServerCnx extends PulsarHandler {
             return;
         }
 
-        String originalPrincipal = null;
-        if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
-            originalPrincipal = validateOriginalPrincipal(
-                    lookup.hasOriginalAuthData() ? lookup.getOriginalAuthData() : null,
-                    lookup.hasOriginalAuthMethod() ? lookup.getOriginalAuthMethod() : null,
-                    lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal, requestId,
-                    lookup);
-
-            if (originalPrincipal == null) {
-                return;
-            }
-        } else {
-            originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
-        }
-
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -312,22 +297,7 @@ public class ServerCnx extends PulsarHandler {
         if (topicName == null) {
             return;
         }
-        String originalPrincipal = null;
-        if (authenticateOriginalAuthData && partitionMetadata.hasOriginalAuthData()) {
-            originalPrincipal = validateOriginalPrincipal(
-                    partitionMetadata.hasOriginalAuthData() ? partitionMetadata.getOriginalAuthData() : null,
-                    partitionMetadata.hasOriginalAuthMethod() ? partitionMetadata.getOriginalAuthMethod() : null,
-                    partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal()
-                            : this.originalPrincipal,
-                    requestId, partitionMetadata);
-
-            if (originalPrincipal == null) {
-                return;
-            }
-        } else {
-            originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
-        }
-
+        
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
             if (invalidOriginalPrincipal(originalPrincipal)) {
@@ -450,26 +420,6 @@ public class ServerCnx extends PulsarHandler {
         return commandConsumerStatsResponseBuilder;
     }
 
-    private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
-        ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
-        SSLSession sslSession = null;
-        if (sslHandler != null) {
-            sslSession = ((SslHandler) sslHandler).engine().getSession();
-        }
-        try {
-            return getOriginalPrincipal(originalAuthData, originalAuthMethod, originalPrincipal, sslSession);
-        } catch (AuthenticationException e) {
-            String msg = "Unable to authenticate original authdata ";
-            log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
-            if (request instanceof CommandLookupTopic) {
-                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthenticationError, msg, requestId));
-            } else if (request instanceof CommandPartitionedTopicMetadata) {
-                ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthenticationError, msg, requestId));
-            }
-            return null;
-        }
-    }
-
     private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
             SSLSession sslSession) throws AuthenticationException {
         if (authenticateOriginalAuthData) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 038e6ae..6a8ab74 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -36,7 +37,6 @@ import java.nio.channels.ClosedChannelException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -73,7 +73,7 @@ import org.slf4j.LoggerFactory;
 
 public class ClientCnx extends PulsarHandler {
 
-    private final Authentication authentication;
+    protected final Authentication authentication;
     private State state;
 
     private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> pendingRequests =
@@ -102,7 +102,7 @@ public class ClientCnx extends PulsarHandler {
     private final int rejectedRequestResetTimeSec = 60;
     private final long operationTimeoutMs;
 
-    private String proxyToTargetBrokerAddress = null;
+    protected String proxyToTargetBrokerAddress = null;
     // Remote hostName with which client is connected
     private String remoteHostName = null;
     private boolean isTlsHostnameVerificationEnable;
@@ -130,7 +130,6 @@ public class ClientCnx extends PulsarHandler {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-
         if (proxyToTargetBrokerAddress == null) {
             if (log.isDebugEnabled()) {
                 log.debug("{} Connected to broker", ctx.channel());
@@ -138,13 +137,8 @@ public class ClientCnx extends PulsarHandler {
         } else {
             log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);
         }
-        String authData = "";
-        if (authentication.getAuthData().hasDataFromCommand()) {
-            authData = authentication.getAuthData().getCommandData();
-        }
         // Send CONNECT command
-        ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData,
-                getPulsarClientVersion(), proxyToTargetBrokerAddress))
+        ctx.writeAndFlush(newConnectCommand())
                 .addListener(future -> {
                     if (future.isSuccess()) {
                         if (log.isDebugEnabled()) {
@@ -157,6 +151,15 @@ public class ClientCnx extends PulsarHandler {
                     }
                 });
     }
+    
+    protected ByteBuf newConnectCommand() throws PulsarClientException {
+        String authData = "";
+        if (authentication.getAuthData().hasDataFromCommand()) {
+            authData = authentication.getAuthData().getCommandData();
+        }
+        return Commands.newConnect(authentication.getAuthMethodName(), authData,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress);
+    }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@@ -757,6 +760,13 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException(errorMsg);
         }
     }
+    
+    @VisibleForTesting
+    public void close() {
+       if (ctx != null) {
+           ctx.close();
+       }
+    }
 
     private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 52d85ca..9b08169 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -29,6 +29,7 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -69,6 +70,10 @@ public class ConnectionPool implements Closeable {
     public static final String TLS_HANDLER = "tls";
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+        this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
+    }
+    
+    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) {
         this.eventLoopGroup = eventLoopGroup;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
 
@@ -99,7 +104,7 @@ public class ConnectionPool implements Closeable {
 
                 ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
-                ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
+                ch.pipeline().addLast("handler", clientCnxSupplier.get());
             }
         });
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a446dd4..38950b6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,6 +25,7 @@ import static java.lang.String.format;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import io.netty.buffer.ByteBuf;
@@ -1349,8 +1350,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     void connectionClosed(ClientCnx cnx) {
         this.connectionHandler.connectionClosed(cnx);
     }
-
-    ClientCnx getClientCnx() {
+    
+    @VisibleForTesting
+    public ClientCnx getClientCnx() {
         return this.connectionHandler.getClientCnx();
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 1228876..4f971b6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -516,7 +516,15 @@ public class Commands {
     }
 
     public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
-        return Commands.newPartitionMetadataRequest(topic, requestId, null, null, null);
+        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
+        partitionMetadataBuilder.setTopic(topic);
+        partitionMetadataBuilder.setRequestId(requestId);
+        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
+        ByteBuf res = serializeWithSize(
+                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
+        partitionMetadataBuilder.recycle();
+        partitionMetadata.recycle();
+        return res;
     }
 
     public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -535,7 +543,15 @@ public class Commands {
     }
 
     public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
-        return Commands.newLookup(topic, authoritative, null, null, null, requestId);
+        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
+        lookupTopicBuilder.setTopic(topic);
+        lookupTopicBuilder.setRequestId(requestId);
+        lookupTopicBuilder.setAuthoritative(authoritative);
+        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
+        lookupTopicBuilder.recycle();
+        lookupBroker.recycle();
+        return res;
     }
 
     public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -1033,52 +1049,6 @@ public class Commands {
         None;
     }
 
-    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String originalAuthRole,
-            String originalAuthData, String originalAuthMethod) {
-        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
-        partitionMetadataBuilder.setTopic(topic);
-        partitionMetadataBuilder.setRequestId(requestId);
-        if (originalAuthRole != null) {
-            partitionMetadataBuilder.setOriginalPrincipal(originalAuthRole);
-        }
-        if (originalAuthData != null) {
-            partitionMetadataBuilder.setOriginalAuthData(originalAuthData);
-        }
-
-        if (originalAuthMethod != null) {
-            partitionMetadataBuilder.setOriginalAuthMethod(originalAuthMethod);
-        }
-        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
-        ByteBuf res = serializeWithSize(
-                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
-        partitionMetadataBuilder.recycle();
-        partitionMetadata.recycle();
-        return res;
-    }
-
-    public static ByteBuf newLookup(String topic, boolean authoritative, String originalAuthRole,
-            String originalAuthData, String originalAuthMethod, long requestId) {
-        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
-        lookupTopicBuilder.setTopic(topic);
-        lookupTopicBuilder.setRequestId(requestId);
-        lookupTopicBuilder.setAuthoritative(authoritative);
-        if (originalAuthRole != null) {
-            lookupTopicBuilder.setOriginalPrincipal(originalAuthRole);
-        }
-        if (originalAuthData != null) {
-            lookupTopicBuilder.setOriginalAuthData(originalAuthData);
-        }
-
-        if (originalAuthMethod != null) {
-            lookupTopicBuilder.setOriginalAuthMethod(originalAuthMethod);
-        }
-        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
-        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
-        lookupTopicBuilder.recycle();
-        lookupBroker.recycle();
-        return res;
-    }
-
     public static boolean peerSupportsGetLastMessageId(int peerVersion) {
         return peerVersion >= ProtocolVersion.v12.getNumber();
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 7e76521..0e55287 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -227,6 +227,7 @@ message CommandSubscribe {
 message CommandPartitionedTopicMetadata {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
+	// TODO - Remove original_principal, original_auth_data, original_auth_method 
 	// Original principal that was verified by
 	// a Pulsar proxy.
 	optional string original_principal = 3;
@@ -254,6 +255,7 @@ message CommandLookupTopic {
 	required uint64 request_id       = 2;
 	optional bool authoritative      = 3 [default = false];
 
+	// TODO - Remove original_principal, original_auth_data, original_auth_method 
 	// Original principal that was verified by
 	// a Pulsar proxy.
 	optional string original_principal = 4;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 8b224f6..5b3814d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -63,7 +63,7 @@ public class DirectProxyHandler {
     private final Authentication authentication;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) {
-        this.authentication = service.getClientAuthentication();
+        this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
@@ -72,7 +72,8 @@ public class DirectProxyHandler {
 
         // Start the connection attempt.
         Bootstrap b = new Bootstrap();
-        // Tie the backend connection on the same thread to avoid context switches when passing data between the 2
+        // Tie the backend connection on the same thread to avoid context
+        // switches when passing data between the 2
         // connections
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
@@ -85,8 +86,8 @@ public class DirectProxyHandler {
                     AuthenticationDataProvider authData = authentication.getAuthData();
                     if (authData.hasDataForTls()) {
                         sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(),
-                                config.getBrokerClientTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey());
+                                config.getBrokerClientTrustCertsFilePath(),
+                                (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
                     } else {
                         sslCtx = SecurityUtility.createNettySslContextForClient(config.isTlsAllowInsecureConnection(),
                                 config.getBrokerClientTrustCertsFilePath());
@@ -101,7 +102,8 @@ public class DirectProxyHandler {
 
         URI targetBroker;
         try {
-            // targetBrokerUrl is coming in the "hostname:6650" form, so we need to extract host and port
+            // targetBrokerUrl is coming in the "hostname:6650" form, so we need
+            // to extract host and port
             targetBroker = new URI("pulsar://" + targetBrokerUrl);
         } catch (URISyntaxException e) {
             log.warn("[{}] Failed to parse broker url '{}'", inboundChannel, targetBrokerUrl, e);
@@ -117,7 +119,8 @@ public class DirectProxyHandler {
                 inboundChannel.close();
                 return;
             }
-            final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline().get("proxyOutboundHandler");
+            final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline()
+                    .get("proxyOutboundHandler");
             cnx.setRemoteHostName(targetBroker.getHost());
         });
     }
@@ -132,7 +135,7 @@ public class DirectProxyHandler {
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
         private ProxyConfiguration config;
-        
+
         public ProxyBackendHandler(ProxyConfiguration config) {
             this.config = config;
         }
@@ -177,7 +180,8 @@ public class DirectProxyHandler {
 
         @Override
         public void operationComplete(Future<Void> future) throws Exception {
-            // This is invoked when the write operation on the paired connection is completed
+            // This is invoked when the write operation on the paired connection
+            // is completed
             if (future.isSuccess()) {
                 outboundChannel.read();
             } else {
@@ -197,15 +201,16 @@ public class DirectProxyHandler {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
             }
-            
+
             if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null
                     && !verifyTlsHostName(remoteHostName, ctx)) {
-                // close the connection if host-verification failed with the broker
+                // close the connection if host-verification failed with the
+                // broker
                 log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
                 ctx.close();
                 return;
             }
-            
+
             state = BackendState.HandshakeCompleted;
 
             inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> {
@@ -231,7 +236,7 @@ public class DirectProxyHandler {
             log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause);
             ctx.close();
         }
-        
+
         public void setRemoteHostName(String remoteHostName) {
             this.remoteHostName = remoteHostName;
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index bbf1c44..bed7ed7 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
@@ -35,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.Counter;
-import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
     private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
@@ -131,26 +132,24 @@ public class LookupProxyHandler {
             log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, topic,
                     clientRequestId);
         }
-        service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+        proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
             // Connected to backend broker
-            long requestId = service.newRequestId();
+            long requestId = proxyConnection.newRequestId();
             ByteBuf command;
-            if (service.getConfiguration().isAuthenticationEnabled()) {
-                command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole,
-                        proxyConnection.clientAuthData, proxyConnection.clientAuthMethod, requestId);
-            } else {
-                command = Commands.newLookup(topic, authoritative, requestId);
-            }
+            command = Commands.newLookup(topic, authoritative, requestId);
             clientCnx.newLookup(command, requestId).thenAccept(result -> {
                 String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
                 if (result.redirect) {
                     // Need to try the lookup again on a different broker
                     performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
                 } else {
-                    // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
-                    // and broker is independent of whether the client itself uses TLS, but we need to force the
+                    // Reply the same address for both TLS non-TLS. The reason
+                    // is that whether we use TLS
+                    // and broker is independent of whether the client itself
+                    // uses TLS, but we need to force the
                     // client
-                    // to use the appropriate target broker (and port) when it will connect back.
+                    // to use the appropriate target broker (and port) when it
+                    // will connect back.
                     proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
                             LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
                 }
@@ -223,17 +222,11 @@ public class LookupProxyHandler {
                         topicName.getPartitionedTopicName(), clientRequestId);
             }
 
-            service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+            proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
                 // Connected to backend broker
-                long requestId = service.newRequestId();
+                long requestId = proxyConnection.newRequestId();
                 ByteBuf command;
-                if (service.getConfiguration().isAuthenticationEnabled()) {
-                    command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
-                            proxyConnection.clientAuthRole, proxyConnection.clientAuthData,
-                            proxyConnection.clientAuthMethod);
-                } else {
-                    command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
-                }
+                command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
                 clientCnx.newLookup(command, requestId).thenAccept(lookupDataResult -> {
                     proxyConnection.ctx().writeAndFlush(
                             Commands.newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
new file mode 100644
index 0000000..9eb1fe7
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+
+public class ProxyClientCnx extends ClientCnx {
+
+	String clientAuthRole;
+	String clientAuthData;
+	String clientAuthMethod;
+	
+	public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
+			String clientAuthData, String clientAuthMethod) {
+		super(conf, eventLoopGroup);
+		this.clientAuthRole = clientAuthRole;
+		this.clientAuthData = clientAuthData;
+		this.clientAuthMethod = clientAuthMethod;
+	}
+    
+	@Override
+	protected ByteBuf newConnectCommand() throws PulsarClientException {
+	    if (log.isDebugEnabled()) {
+            log.debug(
+                    "New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
+                    clientAuthRole, clientAuthData, clientAuthMethod);
+	    }
+	    String authData = null;
+        if (authentication.getAuthData().hasDataFromCommand()) {
+            authData = authentication.getAuthData().getCommandData();
+        }
+        return Commands.newConnect(authentication.getAuthMethodName(), authData,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod);
+    }
+	
+    private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index eaa006d..8e9effd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,12 +23,18 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 
-import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -52,16 +58,18 @@ import io.prometheus.client.Gauge;
  *
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
-
+    // ConnectionPool is used by the proxy to issue lookup requests
+    private PulsarClientImpl client;
     private ProxyService service;
-    String clientAuthRole = null;
-    String clientAuthData = null;
-    String clientAuthMethod = null;
+    private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
     private State state;
 
     private LookupProxyHandler lookupProxyHandler = null;
     private DirectProxyHandler directProxyHandler = null;
+    String clientAuthRole;
+    String clientAuthData;
+    String clientAuthMethod;
 
     enum State {
         Init,
@@ -78,6 +86,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         Closed,
     }
 
+    ConnectionPool getConnectionPool() {
+        return client.getCnxPool();
+    }
+
     private static final Gauge activeConnections = Gauge
             .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
             .register();
@@ -149,7 +161,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read only
+            // Pass the buffer to the outbound connection and schedule next read
+            // only
             // if we can write on the connection
             directProxyHandler.outboundChannel.writeAndFlush(msg).addListener(this);
             break;
@@ -161,7 +174,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     @Override
     public void operationComplete(Future<Void> future) throws Exception {
-        // This is invoked when the write operation on the paired connection is completed
+        // This is invoked when the write operation on the paired connection is
+        // completed
         if (future.isSuccess()) {
             ctx.read();
         } else {
@@ -189,20 +203,22 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             return;
         }
 
-        if (!verifyAuthenticationIfNeeded(connect)) {
+        if (!authenticateAndCreateClient(connect)) {
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
             return;
         }
 
         if (connect.hasProxyToBrokerUrl()) {
-            // Client already knows which broker to connect. Let's open a connection
+            // Client already knows which broker to connect. Let's open a
+            // connection
             // there and just pass bytes in both directions
             state = State.ProxyConnectionToBroker;
             directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl());
             cancelKeepAliveTask();
         } else {
-            // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and
+            // Client is doing a lookup, we can consider the handshake complete
+            // and we'll take care of just topics and
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
@@ -229,14 +245,39 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private void close() {
         state = State.Closed;
         ctx.close();
+        try {
+            client.close();
+        } catch (PulsarClientException e) {
+            LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+        }
     }
 
-    private boolean verifyAuthenticationIfNeeded(CommandConnect connect) {
-        if (!service.getConfiguration().isAuthenticationEnabled()) {
-            return true;
+    ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(service.getServiceUrl());
+        ProxyConfiguration proxyConfig = service.getConfiguration();
+        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
+            clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                    proxyConfig.getBrokerClientAuthenticationParameters()));
         }
+        if (proxyConfig.isTlsEnabledWithBroker()) {
+            clientConf.setUseTls(true);
+            clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
+            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
+        }
+        return clientConf;
+    }
 
+    private boolean authenticateAndCreateClient(CommandConnect connect) {
         try {
+            ClientConfigurationData clientConf = createClientConfiguration();
+            this.clientAuthentication = clientConf.getAuthentication();
+
+            if (!service.getConfiguration().isAuthenticationEnabled()) {
+                this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup());
+                return true;
+            }
+            
             String authMethod = "none";
             if (connect.hasAuthMethodName()) {
                 authMethod = connect.getAuthMethodName();
@@ -245,11 +286,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
             }
             String authData = connect.getAuthData().toStringUtf8();
-
-            if (service.getConfiguration().forwardAuthorizationCredentials()) {
-                clientAuthData = authData;
-                clientAuthMethod = authMethod;
-            }
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
             SSLSession sslSession = null;
             if (sslHandler != null) {
@@ -259,13 +295,34 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             clientAuthRole = service.getAuthenticationService().authenticate(authenticationData, authMethod);
             LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
                     clientAuthRole);
+            if (service.getConfiguration().forwardAuthorizationCredentials()) {
+                this.clientAuthData = authData;
+                this.clientAuthMethod = authMethod;
+            }
+            this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod);
+
             return true;
-        } catch (AuthenticationException e) {
+        } catch (Exception e) {
             LOG.warn("[{}] Unable to authenticate: {}", remoteAddress, e.getMessage());
             return false;
         }
     }
 
+    private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData,
+            final String clientAuthMethod) throws PulsarClientException {
+        return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
+                new ConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
+                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod)));
+    }
+
+    long newRequestId() {
+        return client.newRequestId();
+    }
+
+    public Authentication getClientAuthentication() {
+        return clientAuthentication;
+    }
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state != State.Init;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 48293be..28af1ab 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,13 +21,6 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -39,11 +32,6 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -51,6 +39,13 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 /**
  * Pulsar proxy service
  */
@@ -66,14 +61,10 @@ public class ProxyService implements Closeable {
 
     private final EventLoopGroup acceptorGroup;
     private final EventLoopGroup workerGroup;
+
     private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
     private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
 
-    // ConnectionPool is used by the proxy to issue lookup requests
-    private final PulsarClientImpl client;
-
-    private final Authentication clientAuthentication;
-
     private BrokerDiscoveryProvider discoveryProvider;
 
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@@ -98,21 +89,6 @@ public class ProxyService implements Closeable {
 
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
-
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        clientConf.setServiceUrl(serviceUrl);
-        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters()));
-        }
-        if (proxyConfig.isTlsEnabledWithBroker()) {
-            clientConf.setUseTls(true);
-            clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath());
-            clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection());
-        }
-
-        this.client = new PulsarClientImpl(clientConf, workerGroup);
-        this.clientAuthentication = clientConf.getAuthentication();
     }
 
     public void start() throws Exception {
@@ -148,14 +124,6 @@ public class ProxyService implements Closeable {
         }
     }
 
-    long newRequestId() {
-        return client.newRequestId();
-    }
-
-    ConnectionPool getConnectionPool() {
-        return client.getCnxPool();
-    }
-
     public ZooKeeperClientFactory getZooKeeperClientFactory() {
         if (zkClientFactory == null) {
             zkClientFactory = new ZookeeperClientFactoryImpl();
@@ -174,7 +142,6 @@ public class ProxyService implements Closeable {
         }
         acceptorGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
-        client.close();
     }
 
     public String getServiceUrl() {
@@ -197,10 +164,6 @@ public class ProxyService implements Closeable {
         return authorizationService;
     }
 
-    public Authentication getClientAuthentication() {
-        return clientAuthentication;
-    }
-
     public ConfigurationCacheService getConfigurationCacheService() {
         return configurationCacheService;
     }
@@ -212,6 +175,10 @@ public class ProxyService implements Closeable {
     public Semaphore getLookupRequestSemaphore() {
         return lookupRequestSemaphore.get();
     }
+    
+    public EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
 
     private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
new file mode 100644
index 0000000..f572e5d
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class ProxyAuthenticationTest extends ProducerConsumerBase {
+	private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+    private int webServicePort;
+    private int servicePort;
+    
+	public static class BasicAuthenticationData implements AuthenticationDataProvider {
+		private String authParam;
+
+		public BasicAuthenticationData(String authParam) {
+			this.authParam = authParam;
+		}
+
+		public boolean hasDataFromCommand() {
+			return true;
+		}
+
+		public String getCommandData() {
+			return authParam;
+		}
+
+		public boolean hasDataForHttp() {
+			return true;
+		}
+
+		@Override
+		public Set<Entry<String, String>> getHttpHeaders() {
+			Map<String, String> headers = new HashMap<>();
+			headers.put("BasicAuthentication", authParam);
+			return headers.entrySet();
+		}
+	}
+
+	public static class BasicAuthentication implements Authentication {
+
+		private String authParam;
+
+		@Override
+		public void close() throws IOException {
+			// noop
+		}
+
+		@Override
+		public String getAuthMethodName() {
+			return "BasicAuthentication";
+		}
+
+		@Override
+		public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+			try {
+				return new BasicAuthenticationData(authParam);
+			} catch (Exception e) {
+				throw new PulsarClientException(e);
+			}
+		}
+
+		@Override
+		public void configure(Map<String, String> authParams) {
+			this.authParam = String.format("{\"entityType\": \"%s\", \"expiryTime\": \"%s\"}",
+					authParams.get("entityType"), authParams.get("expiryTime"));
+		}
+
+		@Override
+		public void start() throws PulsarClientException {
+			// noop
+		}
+	}
+
+	public static class BasicAuthenticationProvider implements AuthenticationProvider {
+
+		@Override
+		public void close() throws IOException {
+		}
+
+		@Override
+		public void initialize(ServiceConfiguration config) throws IOException {
+		}
+
+		@Override
+		public String getAuthMethodName() {
+			return "BasicAuthentication";
+		}
+
+		@Override
+		public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+			String commandData = null;
+			if (authData.hasDataFromCommand()) {
+				commandData = authData.getCommandData();
+			} else if (authData.hasDataFromHttp()) {
+				commandData = authData.getHttpHeader("BasicAuthentication");
+			}
+
+			JsonParser parser = new JsonParser();
+			JsonObject element = parser.parse(commandData).getAsJsonObject();
+			long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
+			long currentTimeInMillis = System.currentTimeMillis();
+			if (expiryTimeInMillis < currentTimeInMillis) {
+				throw new AuthenticationException("Authentication data has been expired");
+			}
+			return element.get("entityType").getAsString();
+		}
+	}
+
+	@BeforeMethod
+	@Override
+	protected void setup() throws Exception {
+		webServicePort = PortManager.nextFreePort();
+		servicePort = PortManager.nextFreePort();
+		conf.setAuthenticationEnabled(true);
+		conf.setAuthorizationEnabled(true);
+		conf.setTlsEnabled(false);
+		conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+		// Expires after an hour
+		conf.setBrokerClientAuthenticationParameters(
+				"entityType:broker,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000));
+
+		Set<String> superUserRoles = new HashSet<String>();
+		superUserRoles.add("admin");
+		conf.setSuperUserRoles(superUserRoles);
+
+		Set<String> providers = new HashSet<String>();
+		providers.add(BasicAuthenticationProvider.class.getName());
+		conf.setAuthenticationProviders(providers);
+
+		conf.setClusterName("test");
+		Set<String> proxyRoles = new HashSet<String>();
+		proxyRoles.add("proxy");
+		conf.setProxyRoles(proxyRoles);
+        conf.setAuthenticateOriginalAuthData(true);
+		super.init();
+
+		updateAdminClient();
+		producerBaseSetup();
+	}
+
+	@Override
+	@AfterMethod
+	protected void cleanup() throws Exception {
+		super.internalCleanup();
+	}
+
+	@Test
+	void testAuthentication() throws Exception {
+		log.info("-- Starting {} test --", methodName);
+
+		// Step 1: Create Admin Client
+		updateAdminClient();
+		final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
+		// create a client which connects to proxy and pass authData
+		String namespaceName = "my-property/my-ns";
+		String topicName = "persistent://my-property/my-ns/my-topic1";
+		String subscriptionName = "my-subscriber-name";
+		// expires after 6 seconds
+		String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000);
+		// expires after 3 seconds
+		String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000);
+
+		admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+				Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+		admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+				Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+		// Step 2: Try to use proxy Client as a normal Client - expect exception
+		ProxyConfiguration proxyConfig = new ProxyConfiguration();
+		proxyConfig.setAuthenticationEnabled(true);
+		proxyConfig.setServicePort(servicePort);
+		proxyConfig.setWebServicePort(webServicePort);
+		proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+
+		proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+		proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+		Set<String> providers = new HashSet<>();
+		providers.add(BasicAuthenticationProvider.class.getName());
+		proxyConfig.setAuthenticationProviders(providers);
+		proxyConfig.setForwardAuthorizationCredentials(true);
+		ProxyService proxyService = new ProxyService(proxyConfig);
+
+		proxyService.start();
+
+		// Step 3: Pass correct client params
+		PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
+		proxyClient.newProducer().topic(topicName).create();
+		// Sleep for 4 seconds - wait for proxy auth params to expire
+		Thread.sleep(4 * 1000);
+		proxyClient.newProducer().topic(topicName).create();
+		// Sleep for 3 seconds - wait for client auth parans to expire
+		Thread.sleep(3 * 1000);
+		proxyClient.newProducer().topic(topicName).create();
+		proxyClient.close();
+		proxyService.close();
+	}
+
+	private void updateAdminClient() throws PulsarClientException {
+		// Expires after an hour
+		String adminAuthParams = "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000);
+		admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+				.authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
+	}
+
+	private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams, int numberOfConnections) throws PulsarClientException {
+		return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+				.authentication(BasicAuthentication.class.getName(), authParams).connectionsPerBroker(numberOfConnections).build();
+	}
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index bdd2e86..de5bba1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -119,7 +119,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
             proxyService.start();
             proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
             Assert.fail("Shouldn't be able to subscribe, auth required");
-        } catch (PulsarClientException.AuthorizationException e) {
+        } catch (Exception e) {
             // expected behaviour
         }
 

-- 
To stop receiving notification emails like this one, please contact
jai1@apache.org.