You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/31 00:09:44 UTC

[GitHub] jai1 closed pull request #1002: Making Pulsar Proxy more secure

jai1 closed pull request #1002: Making Pulsar Proxy more secure
URL: https://github.com/apache/incubator-pulsar/pull/1002
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 9fa31ccfd..2bf7ce6df 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -21,6 +21,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
@@ -66,8 +69,8 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
             log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
             throw e;
         } catch (Exception e) {
-            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}", role,
-                    destination, e);
+            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
             throw e;
         }
     }
@@ -96,8 +99,9 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
                         switch (policies.get().subscription_auth_mode) {
                         case Prefix:
                             if (!subscription.startsWith(role)) {
-                                PulsarServerException ex = new PulsarServerException(
-                                        String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
+                                PulsarServerException ex = new PulsarServerException(String.format(
+                                        "Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s",
+                                        role, destination));
                                 permissionFuture.completeExceptionally(ex);
                                 return;
                             }
@@ -111,13 +115,12 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
                     permissionFuture.complete(isAuthorized);
                 });
             }).exceptionally(ex -> {
-                log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
-                        ex);
+                log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination, e);
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
@@ -130,8 +133,8 @@ public boolean canConsume(DestinationName destination, String role, String subsc
             log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
             throw e;
         } catch (Exception e) {
-            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}", role,
-                    destination, e);
+            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
             throw e;
         }
     }
@@ -150,8 +153,46 @@ public boolean canLookup(DestinationName destination, String role) throws Except
         return canProduce(destination, role) || canConsume(destination, role, null);
     }
 
-    private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
-            AuthAction action) {
+    /**
+     * Check whether the specified role can perform a lookup for the specified destination.
+     *
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param destination
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
+        CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
+        canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
+            if (ex == null) {
+                if (produceAuthorized) {
+                    finalResult.complete(produceAuthorized);
+                    return;
+                }
+            } else if (log.isDebugEnabled()) {
+                log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
+                        destination.toString(), role, ex.getMessage());
+            }
+            canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
+                if (e == null) {
+                    if (consumeAuthorized) {
+                        finalResult.complete(consumeAuthorized);
+                        return;
+                    }
+                } else if (log.isDebugEnabled()) {
+                    log.debug(
+                            "Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
+                            destination.toString(), role, e.getMessage());
+                }
+                finalResult.complete(false);
+            });
+        });
+        return finalResult;
+    }
+
+    private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
         if (isSuperUser(role)) {
             return CompletableFuture.completedFuture(true);
         } else {
@@ -218,13 +259,13 @@ private boolean checkCluster(DestinationName destination) {
                 }
                 permissionFuture.complete(false);
             }).exceptionally(ex -> {
-                log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination,
-                        ex);
+                log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+                        ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination, e);
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
@@ -244,8 +285,7 @@ private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAc
             }
 
             // Suffix match
-            if (permittedRole.charAt(0) == '*'
-                    && checkedRole.endsWith(permittedRole.substring(1))
+            if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
                     && permittedActions.contains(checkedAction)) {
                 return true;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index b7361b027..f241cf50a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -1314,7 +1314,7 @@ public void expireMessages(String property, String cluster, String namespace, St
                     validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
-                    throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
+                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
                             clientAppId, dn.toString(), authException.getMessage()));
                 }
             } catch (Exception ex) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bd49fc76f..8888c97e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,7 +100,7 @@
     private String clientVersion = null;
     private int nonPersistentPendingMessages = 0;
     private final int MaxNonPersistentPendingMessages;
-    private String originalPrincipal;
+    private String originalPrincipal = null;
 
     enum State {
         Start, Connected, Failed
@@ -187,73 +187,128 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
-        final String topic = lookup.getTopic();
+        final String topicName = lookup.getTopic();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
+            log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId);
         }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
-                    getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
-                        if (ex == null) {
-                            ctx.writeAndFlush(lookupResponse);
-                        } else {
-                            // it should never happen
-                            log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
-                            ctx.writeAndFlush(
-                                    newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
-                        }
-                        lookupSemaphore.release();
-                        return null;
-                    });
+            final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
+                    : this.originalPrincipal;
+            CompletableFuture<Boolean> isProxyAuthorizedFuture;
+            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationManager()
+                        .canLookupAsync(DestinationName.get(topicName), authRole);
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            
+            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+                if (isProxyAuthorized) {
+                    lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName),
+                            lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
+                            lookup.getRequestId()).handle((lookupResponse, ex) -> {
+                                if (ex == null) {
+                                    ctx.writeAndFlush(lookupResponse);
+                                } else {
+                                    // it should never happen
+                                    log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topicName,
+                                            ex.getMessage(), ex);
+                                    ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+                                            ex.getMessage(), requestId));
+                                }
+                                lookupSemaphore.release();
+                                return null;
+                            });
+                } else {
+                    final String msg = "Proxy Client is not authorized to Lookup";
+                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                    ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                final String msg = "Exception occured while trying to authorize lookup";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                lookupSemaphore.release();
+                return null;
+            });
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic);
+                log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topicName);
             }
             ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
                     "Failed due to too many pending lookup requests", requestId));
         }
-
     }
 
     @Override
     protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
         final long requestId = partitionMetadata.getRequestId();
-        final String topic = partitionMetadata.getTopic();
+        final String topicName = partitionMetadata.getTopic();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId);
+            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId);
         }
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                    originalPrincipal != null ? originalPrincipal : authRole, DestinationName.get(topic))
-                            .handle((metadata, ex) -> {
-                                if (ex == null) {
-                                    int partitions = metadata.partitions;
-                                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
-                                } else {
-                                    if (ex instanceof PulsarClientException) {
-                                        log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
-                                                remoteAddress, topic, ex.getMessage());
-                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
-                                                ServerError.AuthorizationError, ex.getMessage(), requestId));
+
+            final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
+                    ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
+            CompletableFuture<Boolean> isProxyAuthorizedFuture;
+            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationManager()
+                        .canLookupAsync(DestinationName.get(topicName), authRole);
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+                    if (isProxyAuthorized) {
+                        getPartitionedTopicMetadata(getBrokerService().pulsar(),
+                                originalPrincipal != null ? originalPrincipal : authRole,
+                                DestinationName.get(topicName)).handle((metadata, ex) -> {
+                                    if (ex == null) {
+                                        int partitions = metadata.partitions;
+                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
                                     } else {
-                                        log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
-                                                ex.getMessage(), ex);
-                                        ServerError error = (ex instanceof RestException)
-                                                && ((RestException) ex).getResponse().getStatus() < 500
-                                                        ? ServerError.MetadataError : ServerError.ServiceNotReady;
-                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error, ex.getMessage(),
-                                                requestId));
+                                        if (ex instanceof PulsarClientException) {
+                                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
+                                                    remoteAddress, topicName, ex.getMessage());
+                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+                                                    ServerError.AuthorizationError, ex.getMessage(), requestId));
+                                        } else {
+                                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
+                                                    topicName, ex.getMessage(), ex);
+                                            ServerError error = (ex instanceof RestException)
+                                                    && ((RestException) ex).getResponse().getStatus() < 500
+                                                            ? ServerError.MetadataError : ServerError.ServiceNotReady;
+                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+                                                    ex.getMessage(), requestId));
+                                        }
                                     }
-                                }
-                                lookupSemaphore.release();
-                                return null;
-                            });
+                                    lookupSemaphore.release();
+                                    return null;
+                                });
+                    } else {
+                        final String msg = "Proxy Client is not authorized to Get Partition Metadata";
+                        log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                        ctx.writeAndFlush(
+                                Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+                        lookupSemaphore.release();
+                    }
+                    return null;
+            }).exceptionally(ex -> {
+                final String msg = "Exception occured while trying to authorize get Partition Metadata";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                lookupSemaphore.release();
+                return null;
+            });
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress,
-                        topic);
+                        topicName);
             }
             ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TooManyRequests,
                     "Failed due to too many pending lookup requests", requestId));
@@ -330,11 +385,12 @@ protected void handleConnect(CommandConnect connect) {
                 if (sslHandler != null) {
                     sslSession = ((SslHandler) sslHandler).engine().getSession();
                 }
+
                 originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
                 authRole = getBrokerService().getAuthenticationService()
                         .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
 
-                log.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
+                log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
             } catch (AuthenticationException e) {
                 String msg = "Unable to authenticate";
                 log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
@@ -358,119 +414,140 @@ protected void handleConnect(CommandConnect connect) {
     @Override
     protected void handleSubscribe(final CommandSubscribe subscribe) {
         checkArgument(state == State.Connected);
-        CompletableFuture<Boolean> authorizationFuture;
-        if (service.isAuthorizationEnabled()) {
-            authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
-                    DestinationName.get(subscribe.getTopic()),
-                    originalPrincipal != null ? originalPrincipal : authRole,
-                    subscribe.getSubscription());
-        } else {
-            authorizationFuture = CompletableFuture.completedFuture(true);
-        }
         final String topicName = subscribe.getTopic();
-        final String subscriptionName = subscribe.getSubscription();
         final long requestId = subscribe.getRequestId();
         final long consumerId = subscribe.getConsumerId();
-        final SubType subType = subscribe.getSubType();
-        final String consumerName = subscribe.getConsumerName();
-        final boolean isDurable = subscribe.getDurable();
-        final MessageIdImpl startMessageId = subscribe.hasStartMessageId()
-                ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
-                        subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(),
-                        subscribe.getStartMessageId().getBatchIndex())
-                : null;
-
-        final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
-        final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
-
-        authorizationFuture.thenApply(isAuthorized -> {
-            if (isAuthorized) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
-                }
-
-                log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
 
-                try {
-                    Metadata.validateMetadata(metadata);
-                } catch (IllegalArgumentException iae) {
-                    final String msg = iae.getMessage();
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                    return null;
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
+                    authRole, subscribe.getSubscription());
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+            if (isProxyAuthorized) {
+                CompletableFuture<Boolean> authorizationFuture;
+                if (service.isAuthorizationEnabled()) {
+                    authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
+                            DestinationName.get(subscribe.getTopic()),
+                            originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription());
+                } else {
+                    authorizationFuture = CompletableFuture.completedFuture(true);
                 }
 
-                CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
-                CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
-
-                if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer);
-                        ctx.writeAndFlush(Commands.newSuccess(requestId));
-                        return null;
-                    } else {
-                        // There was an early request to create a consumer with same consumerId. This can happen when
-                        // client timeout is lower the broker timeouts. We need to wait until the previous consumer
-                        // creation request either complete or fails.
-                        log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName,
-                                subscriptionName);
-                        ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
-                                : getErrorCode(existingConsumerFuture);
-                        ctx.writeAndFlush(
-                                Commands.newError(requestId, error, "Consumer is already present on the connection"));
-                        return null;
-                    }
-                }
+                final String subscriptionName = subscribe.getSubscription();
+                final SubType subType = subscribe.getSubType();
+                final String consumerName = subscribe.getConsumerName();
+                final boolean isDurable = subscribe.getDurable();
+                final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
+                        subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
+                        subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
+                        : null;
+
+                final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
+                final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+
+                authorizationFuture.thenApply(isAuthorized -> {
+                    if (isAuthorized) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
+                        }
 
-                service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
-                        consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
-                        .thenAccept(consumer -> {
-                            if (consumerFuture.complete(consumer)) {
-                                log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
-                                        subscriptionName);
-                                ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+                        log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
+                        try {
+                            Metadata.validateMetadata(metadata);
+                        } catch (IllegalArgumentException iae) {
+                            final String msg = iae.getMessage();
+                            ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+                            return null;
+                        }
+                        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+                        CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
+                                consumerFuture);
+
+                        if (existingConsumerFuture != null) {
+                            if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
+                                Consumer consumer = existingConsumerFuture.getNow(null);
+                                log.info("[{}] Consumer with the same id is already created: {}", remoteAddress,
+                                        consumer);
+                                ctx.writeAndFlush(Commands.newSuccess(requestId));
+                                return null;
                             } else {
-                                // The consumer future was completed before by a close command
-                                try {
-                                    consumer.close();
-                                    log.info("[{}] Cleared consumer created after timeout on client side {}",
-                                            remoteAddress, consumer);
-                                } catch (BrokerServiceException e) {
-                                    log.warn("[{}] Error closing consumer created after timeout on client side {}: {}",
-                                            remoteAddress, consumer, e.getMessage());
-                                }
-                                consumers.remove(consumerId, consumerFuture);
+                                // There was an early request to create a consumer with same consumerId. This can happen
+                                // when
+                                // client timeout is lower the broker timeouts. We need to wait until the previous
+                                // consumer
+                                // creation request either complete or fails.
+                                log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress,
+                                        topicName, subscriptionName);
+                                ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
+                                        : getErrorCode(existingConsumerFuture);
+                                ctx.writeAndFlush(Commands.newError(requestId, error,
+                                        "Consumer is already present on the connection"));
+                                return null;
                             }
+                        }
 
-                        }) //
-                        .exceptionally(exception -> {
-                            if (exception.getCause() instanceof ConsumerBusyException) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug(
-                                            "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
-                                            remoteAddress, topicName, subscriptionName,
-                                            exception.getCause().getMessage());
-                                }
-                            } else {
-                                log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                        subscriptionName, exception.getCause().getMessage(), exception);
-                            }
+                        service.getTopic(topicName)
+                                .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
+                                        subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
+                                .thenAccept(consumer -> {
+                                    if (consumerFuture.complete(consumer)) {
+                                        log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
+                                                subscriptionName);
+                                        ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+                                    } else {
+                                        // The consumer future was completed before by a close command
+                                        try {
+                                            consumer.close();
+                                            log.info("[{}] Cleared consumer created after timeout on client side {}",
+                                                    remoteAddress, consumer);
+                                        } catch (BrokerServiceException e) {
+                                            log.warn(
+                                                    "[{}] Error closing consumer created after timeout on client side {}: {}",
+                                                    remoteAddress, consumer, e.getMessage());
+                                        }
+                                        consumers.remove(consumerId, consumerFuture);
+                                    }
 
-                            // If client timed out, the future would have been completed by subsequent close. Send error
-                            // back to client, only if not completed already.
-                            if (consumerFuture.completeExceptionally(exception)) {
-                                ctx.writeAndFlush(Commands.newError(requestId,
-                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
-                                        exception.getCause().getMessage()));
-                            }
-                            consumers.remove(consumerId, consumerFuture);
+                                }) //
+                                .exceptionally(exception -> {
+                                    if (exception.getCause() instanceof ConsumerBusyException) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(
+                                                    "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
+                                                    remoteAddress, topicName, subscriptionName,
+                                                    exception.getCause().getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
+                                                subscriptionName, exception.getCause().getMessage(), exception);
+                                    }
 
-                            return null;
+                                    // If client timed out, the future would have been completed by subsequent close.
+                                    // Send error
+                                    // back to client, only if not completed already.
+                                    if (consumerFuture.completeExceptionally(exception)) {
+                                        ctx.writeAndFlush(Commands.newError(requestId,
+                                                BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                                exception.getCause().getMessage()));
+                                    }
+                                    consumers.remove(consumerId, consumerFuture);
 
-                        });
+                                    return null;
+
+                                });
+                    } else {
+                        String msg = "Client is not authorized to subscribe";
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+                    }
+                    return null;
+                });
             } else {
-                String msg = "Client is not authorized to subscribe";
-                log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                final String msg = "Proxy Client is not authorized to subscribe";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
                 ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
             }
             return null;
@@ -489,155 +566,171 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
     @Override
     protected void handleProducer(final CommandProducer cmdProducer) {
         checkArgument(state == State.Connected);
-        CompletableFuture<Boolean> authorizationFuture;
-        if (service.isAuthorizationEnabled()) {
-            authorizationFuture = service.getAuthorizationManager().canProduceAsync(
-                    DestinationName.get(cmdProducer.getTopic()),
-                    originalPrincipal != null ? originalPrincipal : authRole);
-        } else {
-            authorizationFuture = CompletableFuture.completedFuture(true);
-        }
-
-        // Use producer name provided by client if present
-        final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
-                : service.generateUniqueProducerName();
         final String topicName = cmdProducer.getTopic();
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
-        final boolean isEncrypted = cmdProducer.getEncrypted();
-        final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
 
-        authorizationFuture.thenApply(isAuthorized -> {
-            if (isAuthorized) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
-                }
-
-                try {
-                    Metadata.validateMetadata(metadata);
-                } catch (IllegalArgumentException iae) {
-                    final String msg = iae.getMessage();
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                    return null;
-                }
-
-                CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
-                CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
-
-                if (existingProducerFuture != null) {
-                    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
-                        Producer producer = existingProducerFuture.getNow(null);
-                        log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer);
-                        ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
-                        return null;
-                    } else {
-                        // There was an early request to create a producer with
-                        // same producerId. This can happen when
-                        // client
-                        // timeout is lower the broker timeouts. We need to wait
-                        // until the previous producer creation
-                        // request
-                        // either complete or fails.
-                        ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
-                                : getErrorCode(existingProducerFuture);
-                        log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
-                        ctx.writeAndFlush(
-                                Commands.newError(requestId, error, "Producer is already present on the connection"));
-                        return null;
-                    }
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
+                    authRole);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+            if (isProxyAuthorized) {
+                CompletableFuture<Boolean> authorizationFuture;
+                if (service.isAuthorizationEnabled()) {
+                    authorizationFuture = service.getAuthorizationManager().canProduceAsync(
+                            DestinationName.get(cmdProducer.getTopic().toString()),
+                            originalPrincipal != null ? originalPrincipal : authRole);
+                } else {
+                    authorizationFuture = CompletableFuture.completedFuture(true);
                 }
+                // Use producer name provided by client if present
+                final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
+                        : service.generateUniqueProducerName();
+                final boolean isEncrypted = cmdProducer.getEncrypted();
+                final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
+                authorizationFuture.thenApply(isAuthorized -> {
+                    if (isAuthorized) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
+                        }
+                        CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
+                        CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
+                                producerFuture);
+
+                        if (existingProducerFuture != null) {
+                            if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
+                                Producer producer = existingProducerFuture.getNow(null);
+                                log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
+                                        producer);
+                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
+                                return null;
+                            } else {
+                                // There was an early request to create a producer with
+                                // same producerId. This can happen when
+                                // client
+                                // timeout is lower the broker timeouts. We need to wait
+                                // until the previous producer creation
+                                // request
+                                // either complete or fails.
+                                ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
+                                        : getErrorCode(existingProducerFuture);
+                                log.warn("[{}][{}] Producer is already present on the connection", remoteAddress,
+                                        topicName);
+                                ctx.writeAndFlush(Commands.newError(requestId, error,
+                                        "Producer is already present on the connection"));
+                                return null;
+                            }
+                        }
 
-                log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
-
-                service.getTopic(topicName).thenAccept((Topic topic) -> {
-                    // Before creating producer, check if backlog quota exceeded
-                    // on topic
-                    if (topic.isBacklogQuotaExceeded(producerName)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            ctx.writeAndFlush(Commands.newError(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage()));
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            ctx.writeAndFlush(
-                                    Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
+                        log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
+
+                        service.getTopic(topicName).thenAccept((Topic topic) -> {
+                            // Before creating producer, check if backlog quota exceeded
+                            // on topic
+                            if (topic.isBacklogQuotaExceeded(producerName)) {
+                                IllegalStateException illegalStateException = new IllegalStateException(
+                                        "Cannot create producer on topic with backlog quota exceeded");
+                                BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
+                                if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
+                                    ctx.writeAndFlush(
+                                            Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError,
+                                                    illegalStateException.getMessage()));
+                                } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
+                                    ctx.writeAndFlush(Commands.newError(requestId,
+                                            ServerError.ProducerBlockedQuotaExceededException,
                                             illegalStateException.getMessage()));
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
-                    }
+                                }
+                                producerFuture.completeExceptionally(illegalStateException);
+                                producers.remove(producerId, producerFuture);
+                                return;
+                            }
 
-                    // Check whether the producer will publish encrypted messages or not
-                    if (topic.isEncryptionRequired() && !isEncrypted) {
-                        String msg = String.format("Encryption is required in %s", topicName);
-                        log.warn("[{}] {}", remoteAddress, msg);
-                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                        return;
-                    }
+                            // Check whether the producer will publish encrypted messages or not
+                            if (topic.isEncryptionRequired() && !isEncrypted) {
+                                String msg = String.format("Encryption is required in %s", topicName);
+                                log.warn("[{}] {}", remoteAddress, msg);
+                                ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+                                return;
+                            }
 
-                    disableTcpNoDelayIfNeeded(topicName, producerName);
+                            disableTcpNoDelayIfNeeded(topicName, producerName);
 
-                    Producer producer =
-                            new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted, metadata);
+                            Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+                                    isEncrypted, metadata);
 
-                    try {
-                        topic.addProducer(producer);
+                            try {
+                                topic.addProducer(producer);
 
-                        if (isActive()) {
-                            if (producerFuture.complete(producer)) {
-                                log.info("[{}] Created new producer: {}", remoteAddress, producer);
-                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
-                                        producer.getLastSequenceId()));
-                                return;
-                            } else {
-                                // The producer's future was completed before by
-                                // a close command
-                                producer.closeNow();
-                                log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress,
-                                        producer);
+                                if (isActive()) {
+                                    if (producerFuture.complete(producer)) {
+                                        log.info("[{}] Created new producer: {}", remoteAddress, producer);
+                                        ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+                                                producer.getLastSequenceId()));
+                                        return;
+                                    } else {
+                                        // The producer's future was completed before by
+                                        // a close command
+                                        producer.closeNow();
+                                        log.info("[{}] Cleared producer created after timeout on client side {}",
+                                                remoteAddress, producer);
+                                    }
+                                } else {
+                                    producer.closeNow();
+                                    log.info("[{}] Cleared producer created after connection was closed: {}",
+                                            remoteAddress, producer);
+                                    producerFuture.completeExceptionally(
+                                            new IllegalStateException("Producer created after connection was closed"));
+                                }
+                            } catch (BrokerServiceException ise) {
+                                log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
+                                        ise.getMessage());
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
+                                producerFuture.completeExceptionally(ise);
                             }
-                        } else {
-                            producer.closeNow();
-                            log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress,
-                                    producer);
-                            producerFuture.completeExceptionally(
-                                    new IllegalStateException("Producer created after connection was closed"));
-                        }
-                    } catch (BrokerServiceException ise) {
-                        log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
-                                ise.getMessage());
-                        ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise),
-                                ise.getMessage()));
-                        producerFuture.completeExceptionally(ise);
-                    }
 
-                    producers.remove(producerId, producerFuture);
-                }).exceptionally(exception -> {
-                    Throwable cause = exception.getCause();
-                    if (!(cause instanceof ServiceUnitNotReadyException)) {
-                        // Do not print stack traces for expected exceptions
-                        log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
-                    }
+                            producers.remove(producerId, producerFuture);
+                        }).exceptionally(exception -> {
+                            Throwable cause = exception.getCause();
+                            if (!(cause instanceof ServiceUnitNotReadyException)) {
+                                // Do not print stack traces for expected exceptions
+                                log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+                            }
 
-                    // If client timed out, the future would have been completed
-                    // by subsequent close. Send error back to
-                    // client, only if not completed already.
-                    if (producerFuture.completeExceptionally(exception)) {
-                        ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause),
-                                cause.getMessage()));
-                    }
-                    producers.remove(producerId, producerFuture);
+                            // If client timed out, the future would have been completed
+                            // by subsequent close. Send error back to
+                            // client, only if not completed already.
+                            if (producerFuture.completeExceptionally(exception)) {
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+                            }
+                            producers.remove(producerId, producerFuture);
 
+                            return null;
+                        });
+                    } else {
+                        String msg = "Client is not authorized to Produce";
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+                    }
                     return null;
                 });
             } else {
-                String msg = "Client is not authorized to Produce";
-                log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                final String msg = "Proxy Client is not authorized to Produce";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
                 ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
             }
             return null;
+        }).exceptionally(ex -> {
+            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+            log.warn(msg);
+            ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
+            return null;
         });
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index e7b670a63..7d847ee92 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -449,16 +449,7 @@ public static ByteBuf newPartitionMetadataResponse(ServerError error, String err
     }
 
     public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
-        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
-        partitionMetadataBuilder.setTopic(topic);
-        partitionMetadataBuilder.setRequestId(requestId);
-
-        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
-        ByteBuf res = serializeWithSize(
-                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
-        partitionMetadataBuilder.recycle();
-        partitionMetadata.recycle();
-        return res;
+        return Commands.newPartitionMetadataRequest(topic, requestId, null);
     }
 
     public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -477,16 +468,7 @@ public static ByteBuf newPartitionMetadataResponse(int partitions, long requestI
     }
 
     public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
-        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
-        lookupTopicBuilder.setTopic(topic);
-        lookupTopicBuilder.setRequestId(requestId);
-        lookupTopicBuilder.setAuthoritative(authoritative);
-
-        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
-        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
-        lookupTopicBuilder.recycle();
-        lookupBroker.recycle();
-        return res;
+        return Commands.newLookup(topic, authoritative, null, requestId);
     }
 
     public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -640,7 +622,7 @@ static ByteBuf newPing() {
     }
 
     static ByteBuf newPong() {
-    	return cmdPong.retainedDuplicate();
+        return cmdPong.retainedDuplicate();
     }
 
     private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
@@ -891,4 +873,34 @@ private static int getCurrentProtocolVersion() {
         Crc32c,
         None;
     }
+
+    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String clientAuthRole) {
+        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
+        partitionMetadataBuilder.setTopic(topic);
+        partitionMetadataBuilder.setRequestId(requestId);
+        if (clientAuthRole != null) { 
+            partitionMetadataBuilder.setOriginalPrincipal(clientAuthRole);
+        }
+        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
+        ByteBuf res = serializeWithSize(
+                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
+        partitionMetadataBuilder.recycle();
+        partitionMetadata.recycle();
+        return res;
+    }
+
+    public static ByteBuf newLookup(String topic, boolean authoritative, String clientAuthRole, long requestId) {
+        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
+        lookupTopicBuilder.setTopic(topic);
+        lookupTopicBuilder.setRequestId(requestId);
+        lookupTopicBuilder.setAuthoritative(authoritative);
+        if (clientAuthRole != null) {
+            lookupTopicBuilder.setOriginalPrincipal(clientAuthRole);
+        }
+        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
+        lookupTopicBuilder.recycle();
+        lookupBroker.recycle();
+        return res;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 6e344cbcd..eef365188 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6779,6 +6779,10 @@ public Builder removeMetadata(int index) {
     // required uint64 request_id = 2;
     boolean hasRequestId();
     long getRequestId();
+    
+    // optional string original_principal = 3;
+    boolean hasOriginalPrincipal();
+    String getOriginalPrincipal();
   }
   public static final class CommandPartitionedTopicMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -6859,9 +6863,42 @@ public long getRequestId() {
       return requestId_;
     }
     
+    // optional string original_principal = 3;
+    public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 3;
+    private java.lang.Object originalPrincipal_;
+    public boolean hasOriginalPrincipal() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getOriginalPrincipal() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalPrincipal_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalPrincipal_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
+      originalPrincipal_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6894,6 +6931,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeUInt64(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getOriginalPrincipalBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6910,6 +6950,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getOriginalPrincipalBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7027,6 +7071,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000001);
         requestId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
+        originalPrincipal_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -7068,6 +7114,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000002;
         }
         result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.originalPrincipal_ = originalPrincipal_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7080,6 +7130,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPar
         if (other.hasRequestId()) {
           setRequestId(other.getRequestId());
         }
+        if (other.hasOriginalPrincipal()) {
+          setOriginalPrincipal(other.getOriginalPrincipal());
+        }
         return this;
       }
       
@@ -7127,6 +7180,11 @@ public Builder mergeFrom(
               requestId_ = input.readUInt64();
               break;
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              originalPrincipal_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -7190,6 +7248,42 @@ public Builder clearRequestId() {
         return this;
       }
       
+      // optional string original_principal = 3;
+      private java.lang.Object originalPrincipal_ = "";
+      public boolean hasOriginalPrincipal() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getOriginalPrincipal() {
+        java.lang.Object ref = originalPrincipal_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalPrincipal_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalPrincipal(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        originalPrincipal_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalPrincipal() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+        
+        return this;
+      }
+      void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000004;
+        originalPrincipal_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandPartitionedTopicMetadata)
     }
     
@@ -7865,6 +7959,10 @@ void setMessage(com.google.protobuf.ByteString value) {
     // optional bool authoritative = 3 [default = false];
     boolean hasAuthoritative();
     boolean getAuthoritative();
+    
+    // optional string original_principal = 4;
+    boolean hasOriginalPrincipal();
+    String getOriginalPrincipal();
   }
   public static final class CommandLookupTopic extends
       com.google.protobuf.GeneratedMessageLite
@@ -7955,10 +8053,43 @@ public boolean getAuthoritative() {
       return authoritative_;
     }
     
+    // optional string original_principal = 4;
+    public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 4;
+    private java.lang.Object originalPrincipal_;
+    public boolean hasOriginalPrincipal() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public String getOriginalPrincipal() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalPrincipal_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalPrincipal_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
       authoritative_ = false;
+      originalPrincipal_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7994,6 +8125,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(3, authoritative_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getOriginalPrincipalBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -8014,6 +8148,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(3, authoritative_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getOriginalPrincipalBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -8133,6 +8271,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000002);
         authoritative_ = false;
         bitField0_ = (bitField0_ & ~0x00000004);
+        originalPrincipal_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -8178,6 +8318,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000004;
         }
         result.authoritative_ = authoritative_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.originalPrincipal_ = originalPrincipal_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -8193,6 +8337,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandLoo
         if (other.hasAuthoritative()) {
           setAuthoritative(other.getAuthoritative());
         }
+        if (other.hasOriginalPrincipal()) {
+          setOriginalPrincipal(other.getOriginalPrincipal());
+        }
         return this;
       }
       
@@ -8245,6 +8392,11 @@ public Builder mergeFrom(
               authoritative_ = input.readBool();
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              originalPrincipal_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -8329,6 +8481,42 @@ public Builder clearAuthoritative() {
         return this;
       }
       
+      // optional string original_principal = 4;
+      private java.lang.Object originalPrincipal_ = "";
+      public boolean hasOriginalPrincipal() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public String getOriginalPrincipal() {
+        java.lang.Object ref = originalPrincipal_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalPrincipal_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalPrincipal(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        originalPrincipal_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalPrincipal() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+        
+        return this;
+      }
+      void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000008;
+        originalPrincipal_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 6c86530d4..faa894f8c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -72,9 +72,9 @@ message MessageMetadata {
 	// differentiate single and batch message metadata
 	optional int32 num_messages_in_batch = 11 [default = 1];
 
-        // the timestamp that this event occurs. it is typically set by applications.
-        // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
-        optional uint64 event_time = 12 [default = 0];
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	optional uint64 event_time = 12 [default = 0];
 	// Contains encryption key name, encrypted key and metadata to describe the key
 	repeated EncryptionKeys encryption_keys = 13;
 	// Algorithm used to encrypt data key
@@ -186,6 +186,7 @@ message CommandSubscribe {
 message CommandPartitionedTopicMetadata {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
+	optional string original_principal = 3;
 }
 
 message CommandPartitionedTopicMetadataResponse {
@@ -198,13 +199,13 @@ message CommandPartitionedTopicMetadataResponse {
 	optional LookupType response          = 3;
 	optional ServerError error            = 4;
 	optional string message               = 5;
-
 }
 
 message CommandLookupTopic {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
 	optional bool authoritative      = 3 [default = false];
+	optional string original_principal = 4;
 }
 
 message CommandLookupTopicResponse {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 9dc876fc0..5d35b862f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -33,7 +33,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
 import io.prometheus.client.Counter;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
     private final ProxyService service;
@@ -41,6 +43,7 @@
     private final boolean connectWithTLS;
 
     private SocketAddress clientAddress;
+    private String brokerServiceURL;
 
     private static final Counter lookupRequests = Counter
             .build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
@@ -54,6 +57,8 @@ public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
         this.proxyConnection = proxyConnection;
         this.clientAddress = proxyConnection.clientAddress();
         this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
+        this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS()
+                : proxy.getConfiguration().getBrokerServiceURL();
     }
 
     public void handleLookup(CommandLookupTopic lookup) {
@@ -64,20 +69,24 @@ public void handleLookup(CommandLookupTopic lookup) {
         lookupRequests.inc();
         long clientRequestId = lookup.getRequestId();
         String topic = lookup.getTopic();
-
-        ServiceLookupData availableBroker = null;
-        try {
-            availableBroker = service.getDiscoveryProvider().nextBroker();
-        } catch (Exception e) {
-            log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-            proxyConnection.ctx().writeAndFlush(
-                    Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
-            return;
+        String serviceUrl;
+        if (isBlank(brokerServiceURL)) {
+            ServiceLookupData availableBroker = null;
+            try {
+                availableBroker = service.getDiscoveryProvider().nextBroker();
+            } catch (Exception e) {
+                log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+                proxyConnection.ctx().writeAndFlush(
+                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
+                return;
+            }
+            serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
+                    : availableBroker.getPulsarServiceUrl();
+        } else {
+            serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+                    : service.getConfiguration().getBrokerServiceURL();
         }
-
-        performLookup(clientRequestId, topic,
-                this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(),
-                false, 10);
+        performLookup(clientRequestId, topic, serviceUrl, false, 10);
     }
 
     private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative,
@@ -99,31 +108,41 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
 
         InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
         if (log.isDebugEnabled()) {
-            log.debug("Getting connections to '{}'", addr);
+            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, topic,
+                    clientRequestId);
         }
         service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
             // Connected to backend broker
             long requestId = service.newRequestId();
-            clientCnx.newLookup(Commands.newLookup(topic, authoritative, requestId), requestId).thenAccept(result -> {
-                if (result.redirect) {
-                    // Need to try the lookup again on a different broker
-                    performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
-                } else {
-                    // We have the result immediately
-                    String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
-
-                    // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS between proxy
-                    // and broker is independent of whether the client itself uses TLS, but we need to force the client
-                    // to use the appropriate target broker (and port) when it will connect back.
-                    proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
-                            LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
-                }
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
-                return null;
-            });
+            ByteBuf command;
+            if (service.getConfiguration().isAuthenticationEnabled()) {
+                command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole, requestId);
+            } else {
+                command = Commands.newLookup(topic, authoritative, requestId);
+            }
+            clientCnx.newLookup(command,
+                    requestId).thenAccept(result -> {
+                        if (result.redirect) {
+                            // Need to try the lookup again on a different broker
+                            performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
+                        } else {
+                            // We have the result immediately
+                            String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
+
+                            // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
+                            // between proxy
+                            // and broker is independent of whether the client itself uses TLS, but we need to force the
+                            // client
+                            // to use the appropriate target broker (and port) when it will connect back.
+                            proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+                                    LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+                        }
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
+                        proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                                ex.getMessage(), clientRequestId));
+                        return null;
+                    });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
             proxyConnection.ctx().writeAndFlush(
@@ -132,30 +151,74 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
         });
     }
 
-    void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
+    public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
         partitionsMetadataRequests.inc();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
 
-        final long requestId = partitionMetadata.getRequestId();
+        final long clientRequestId = partitionMetadata.getRequestId();
         DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
-
-        service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
-                .thenAccept(metadata -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Total number of partitions for topic {} is {}", proxyConnection.clientAuthRole,
-                                dn, metadata.partitions);
-                    }
-                    proxyConnection.ctx()
-                            .writeAndFlush(Commands.newPartitionMetadataResponse(metadata.partitions, requestId));
-                }).exceptionally(ex -> {
-                    log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
-                            ex.getMessage(), ex);
-                    proxyConnection.ctx().writeAndFlush(Commands
-                            .newPartitionMetadataResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
-                    return null;
-                });
+        if (isBlank(brokerServiceURL)) {
+            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
+                    .thenAccept(metadata -> {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Total number of partitions for topic {} is {}",
+                                    proxyConnection.clientAuthRole, dn, metadata.partitions);
+                        }
+                        proxyConnection.ctx().writeAndFlush(
+                                Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
+                                ex.getMessage(), ex);
+                        proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
+                                ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+                        return null;
+                    });
+        } else {
+            URI brokerURI;
+            try {
+                brokerURI = new URI(brokerServiceURL);
+            } catch (URISyntaxException e) {
+                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                        e.getMessage(), clientRequestId));
+                return;
+            }
+            InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
+
+            if (log.isDebugEnabled()) {
+                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
+                        dn.getPartitionedTopicName(), clientRequestId);
+            }
+
+            service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+                // Connected to backend broker
+                long requestId = service.newRequestId();
+                ByteBuf command;
+                if (service.getConfiguration().isAuthenticationEnabled()) {
+                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId, proxyConnection.clientAuthRole);
+                } else {
+                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId);
+                }
+                clientCnx.newLookup(
+                        command,
+                        requestId).thenAccept(lookupDataResult -> {
+                            proxyConnection.ctx().writeAndFlush(Commands
+                                    .newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
+                        }).exceptionally((ex) -> {
+                            log.warn("[{}] failed to get Partitioned metadata : {}", dn.toString(),
+                                    ex.getCause().getMessage(), ex);
+                            proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
+                                    ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+                            return null;
+                        });
+            }).exceptionally(ex -> {
+                // Failed to connect to backend broker
+                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                        ex.getMessage(), clientRequestId));
+                return null;
+            });
+        }
     }
 
     private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 0d0f160a6..1c45c4fed 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -34,7 +34,11 @@
 
     // ZooKeeper session timeout
     private int zookeeperSessionTimeoutMs = 30_000;
-
+    
+    // if Service Discovery is Disabled this url should point to the discovery service provider. 
+    private String brokerServiceURL;
+    private String brokerServiceURLTLS;
+    
     // Port to use to server binary-proto request
     private int servicePort = 6650;
     // Port to use to server binary-proto-tls request
@@ -78,6 +82,22 @@
 
     private Properties properties = new Properties();
 
+    public String getBrokerServiceURLTLS() {
+        return brokerServiceURLTLS;
+    }
+    
+    public void setBrokerServiceURLTLS(String discoveryServiceURLTLS) {
+        this.brokerServiceURLTLS = discoveryServiceURLTLS;
+    }
+    
+    public String getBrokerServiceURL() {
+        return brokerServiceURL;
+    }
+    
+    public void setBrokerServiceURL(String discoveryServiceURL) {
+        this.brokerServiceURL = discoveryServiceURL;
+    }
+    
     public String getZookeeperServers() {
         return zookeeperServers;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index fed6e15c8..e5c2e9198 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -110,21 +111,23 @@ public ProxyService(ProxyConfiguration proxyConfig) throws IOException {
     }
 
     public void start() throws Exception {
-        localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
-                proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
-        localZooKeeperConnectionService.start(new ShutdownService() {
-            @Override
-            public void shutdown(int exitCode) {
-                LOG.error("Lost local ZK session. Shutting down the proxy");
-                Runtime.getRuntime().halt(-1);
-            }
-        });
-
-        discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
-        this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
         ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig);
         authenticationService = new AuthenticationService(serviceConfiguration);
-        authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+
+        if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) {
+            localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
+                    proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
+            localZooKeeperConnectionService.start(new ShutdownService() {
+                @Override
+                public void shutdown(int exitCode) {
+                    LOG.error("Lost local ZK session. Shutting down the proxy");
+                    Runtime.getRuntime().halt(-1);
+                }
+            });
+            discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
+            this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
+            authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+        }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
         bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -145,7 +148,7 @@ public void shutdown(int exitCode) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
             tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
             tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync();
-            LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getWebServicePortTls());
+            LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getServicePortTls());
         }
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index b7e9394e7..d9fc4af55 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -88,8 +88,13 @@ public ProxyServiceStarter(String[] args) throws Exception {
             config.setGlobalZookeeperServers(globalZookeeperServers);
         }
 
-        checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
-        checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+        if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
+                || config.isAuthorizationEnabled()) {
+            checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+            checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+        }
+
+        java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
 
         // create broker service
         ProxyService discoveryService = new ProxyService(config);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 556a8b548..c62bbc1f1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -61,7 +61,8 @@
     private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
     private final String configClusterName = "use";
@@ -117,10 +118,12 @@ protected void setup() throws Exception {
         proxyConfig.setBrokerClientAuthenticationParameters(
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
+        
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
-
         proxyService.start();
     }
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 17696c7ff..05d79efef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -40,6 +40,8 @@
 
 public class ProxyTest extends MockedPulsarServiceBaseTest {
 
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
@@ -49,6 +51,9 @@ protected void setup() throws Exception {
         internalSetup();
 
         proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 89426d597..a87ef0920 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -43,7 +43,7 @@
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
     private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
     private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
-    
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
     
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -61,6 +61,9 @@ protected void setup() throws Exception {
         proxyConfig.setTlsEnabledWithBroker(false);
         proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
         proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
new file mode 100644
index 000000000..04717ceac
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyWithProxyAuthorizationNegTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationNegTest.class);
+
+    private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+    private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+    private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+    private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        
+        // enable tls and auth&auth at broker 
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("proxy-authorization-neg");
+
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setWebServicePort(PortManager.nextFreePort());
+        proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setTlsEnabledInProxy(true);
+        proxyConfig.setTlsEnabledWithBroker(true);
+
+        // enable tls and auth&auth at proxy
+        proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setAuthenticationProviders(providers);
+ 
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+
+        proxyService.start();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    /**
+     * <pre>
+     * It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker>
+     * 
+     * 1. client connects to proxy over tls and pass auth-data
+     * 2. proxy authenticate client and retrieve client-role 
+     *    and send it to broker as originalPrincipal over tls
+     * 3. client creates producer/consumer via proxy
+     * 4. broker authorize producer/consumer create request using originalPrincipal
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testProxyAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
+
+        String namespaceName = "my-property/proxy-authorization-neg/my-ns";
+        
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization-neg")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer;
+        try {
+            consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+                    conf);
+        } catch (Exception ex) {
+            // expected
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume));
+            log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
+            consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+                    conf);
+        }
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+        Producer producer;
+        try {
+            producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+        } catch(Exception ex) {
+            // expected
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+            producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+        }
+        final int msgs = 10;
+        for (int i = 0; i < msgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            count++;
+        }
+        // Acknowledge the consumption of all messages at once
+        Assert.assertEquals(msgs, count);
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    protected final void createAdminClient() throws Exception {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+    }
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
new file mode 100644
index 000000000..558f5e0e8
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ProxyWithProxyAuthorizationTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationTest.class);
+
+    private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+    private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+    private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+    private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        
+        // enable tls and auth&auth at broker 
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("proxy-authorization");
+
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setWebServicePort(PortManager.nextFreePort());
+        proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setTlsEnabledInProxy(true);
+        proxyConfig.setTlsEnabledWithBroker(true);
+
+        // enable tls and auth&auth at proxy
+        proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setAuthenticationProviders(providers);
+ 
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+
+        proxyService.start();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    /**
+     * <pre>
+     * It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker>
+     * 
+     * 1. client connects to proxy over tls and pass auth-data
+     * 2. proxy authenticate client and retrieve client-role 
+     *    and send it to broker as originalPrincipal over tls
+     * 3. client creates producer/consumer via proxy
+     * 4. broker authorize producer/consumer create request using originalPrincipal
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void textProxyAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+        
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1", "my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
+        Producer producer = proxyClient.createProducer("persistent://my-property/proxy-authorization/my-ns/my-topic1", producerConf);
+        final int msgs = 10;
+        for (int i = 0; i < msgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            count++;
+        }
+        // Acknowledge the consumption of all messages at once
+        Assert.assertEquals(msgs, count);
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    protected final void createAdminClient() throws Exception {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+    }
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
new file mode 100644
index 000000000..6b47d2db9
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.spy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Maps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.testng.Assert;
+
+public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
+
+    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        
+        // enable tls and auth&auth at broker 
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(false);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("without-service-discovery");
+
+        super.init();
+
+        // start proxy service
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
+        proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setWebServicePort(PortManager.nextFreePort());
+        proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
+        proxyConfig.setTlsEnabledInProxy(true);
+        proxyConfig.setTlsEnabledWithBroker(true);
+
+        // enable tls and auth&auth at proxy
+        proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        proxyConfig.setAuthenticationProviders(providers);
+ 
+        proxyService = Mockito.spy(new ProxyService(proxyConfig));
+
+        proxyService.start();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        proxyService.close();
+    }
+
+    /**
+     * <pre>
+     * It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker>
+     * 
+     * 1. client connects to proxy over tls and pass auth-data
+     * 2. proxy authenticate client and retrieve client-role 
+     *    and send it to broker as originalPrincipal over tls
+     * 3. client creates producer/consumer via proxy
+     * 4. broker authorize producer/consumer create request using originalPrincipal
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testDiscoveryService() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("without-service-discovery")));
+        admin.namespaces().createNamespace("my-property/without-service-discovery/my-ns");
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = proxyClient.subscribe("persistent://my-property/without-service-discovery/my-ns/my-topic1", "my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
+        Producer producer = proxyClient.createProducer("persistent://my-property/without-service-discovery/my-ns/my-topic1", producerConf);
+        final int msgs = 10;
+        for (int i = 0; i < msgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        int count = 0;
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            count++;
+        }
+        // Acknowledge the consumption of all messages at once
+        Assert.assertEquals(msgs, count);
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(auth);
+        clientConf.setUseTls(true);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+        return PulsarClient.create(lookupUrl, clientConf);
+    }
+
+}
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
new file mode 100644
index 000000000..63fcf38fe
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:c9
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:22:54 2017 GMT
+            Not After : Dec 20 02:22:54 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Broker
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:ba:ab:bd:1d:68:9e:1f:6d:99:8a:8e:95:8d:dc:
+                    b7:e5:95:1a:40:ff:9e:5d:be:38:e6:19:1c:39:0d:
+                    39:e3:e0:cd:96:42:09:41:9f:ca:f1:7f:63:6f:be:
+                    a5:46:1b:07:06:01:43:11:ed:e9:f9:a2:41:2a:29:
+                    ac:10:d3:df:30:4a:f5:9b:5d:b9:97:2b:d4:10:82:
+                    92:55:e7:ca:b1:eb:94:6a:63:e6:28:a3:75:0e:f2:
+                    5b:ff:1a:df:0b:3e:2d:6b:c8:c1:49:98:2b:c1:5f:
+                    9a:c6:1d:94:26:7f:eb:6f:7e:81:c2:27:23:13:90:
+                    4f:89:04:dd:2c:8d:de:4c:f8:9f:33:b9:28:ed:7e:
+                    3a:14:fa:6f:d0:cc:50:5e:75:40:39:e2:57:46:af:
+                    b7:67:8f:c9:57:f2:85:b0:54:59:02:76:c8:92:2c:
+                    af:19:3e:09:d8:5f:a4:d0:9c:a7:35:77:c9:aa:90:
+                    50:86:2a:9a:3c:8f:3b:50:a5:01:88:b9:d3:eb:4d:
+                    23:24:f2:58:65:1c:03:7a:0a:2c:20:30:b6:46:8d:
+                    b1:65:1c:16:0c:bf:bd:87:df:1c:e6:46:c8:f7:4f:
+                    60:fd:a1:91:c9:e4:ff:21:e7:e8:65:70:ba:9f:d6:
+                    44:07:27:45:1d:69:e7:d6:72:d8:d0:3e:df:2e:61:
+                    9e:4d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                1C:C6:F7:DB:06:C1:1D:1C:7C:9E:64:AF:E5:47:47:80:00:6C:C8:26
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        7f:b4:f8:d6:9c:ea:01:1b:74:19:a9:ee:ea:83:66:11:df:90:
+        c5:f0:e6:bc:05:bd:b4:8a:64:d6:08:fd:75:da:2e:f5:f9:20:
+        e0:62:8b:b8:b7:bd:c3:92:0f:a3:61:c7:78:6a:68:ea:74:20:
+        8e:a8:b7:0f:28:d1:54:8a:55:af:38:8c:a7:64:79:1c:95:f6:
+        b8:f3:48:0e:14:2b:78:75:ff:96:70:85:28:30:1f:fa:94:a9:
+        43:cd:98:6e:7b:80:68:bc:08:cc:35:1d:df:34:df:3d:58:52:
+        c3:5d:55:65:b6:be:ef:a2:78:a0:3c:41:c8:af:9f:74:e6:d8:
+        0a:d3
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/JMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMjU0
+WhcNMTgxMjIwMDIyMjU0WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQnJva2VyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuqu9HWie
+H22Zio6Vjdy35ZUaQP+eXb445hkcOQ054+DNlkIJQZ/K8X9jb76lRhsHBgFDEe3p
++aJBKimsENPfMEr1m125lyvUEIKSVefKseuUamPmKKN1DvJb/xrfCz4ta8jBSZgr
+wV+axh2UJn/rb36BwicjE5BPiQTdLI3eTPifM7ko7X46FPpv0MxQXnVAOeJXRq+3
+Z4/JV/KFsFRZAnbIkiyvGT4J2F+k0JynNXfJqpBQhiqaPI87UKUBiLnT600jJPJY
+ZRwDegosIDC2Ro2xZRwWDL+9h98c5kbI909g/aGRyeT/IefoZXC6n9ZEBydFHWnn
+1nLY0D7fLmGeTQIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUHMb32wbBHRx8
+nmSv5UdHgABsyCYwHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAf7T41pzqARt0Ganu6oNmEd+QxfDmvAW9tIpk1gj9ddou
+9fkg4GKLuLe9w5IPo2HHeGpo6nQgjqi3DyjRVIpVrziMp2R5HJX2uPNIDhQreHX/
+lnCFKDAf+pSpQ82YbnuAaLwIzDUd3zTfPVhSw11VZba+76J4oDxByK+fdObYCtM=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
new file mode 100644
index 000000000..8e47938b8
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC6q70daJ4fbZmK
+jpWN3LfllRpA/55dvjjmGRw5DTnj4M2WQglBn8rxf2NvvqVGGwcGAUMR7en5okEq
+KawQ098wSvWbXbmXK9QQgpJV58qx65RqY+Yoo3UO8lv/Gt8LPi1ryMFJmCvBX5rG
+HZQmf+tvfoHCJyMTkE+JBN0sjd5M+J8zuSjtfjoU+m/QzFBedUA54ldGr7dnj8lX
+8oWwVFkCdsiSLK8ZPgnYX6TQnKc1d8mqkFCGKpo8jztQpQGIudPrTSMk8lhlHAN6
+CiwgMLZGjbFlHBYMv72H3xzmRsj3T2D9oZHJ5P8h5+hlcLqf1kQHJ0UdaefWctjQ
+Pt8uYZ5NAgMBAAECggEBAIY3Tx1jCDYOppQiGtPKPAr9XsgXQrWiPOTsbwdyRApd
+q1P7HQ6rJs7mygcha1HxwuYFaETu7AkKKZJ4LfhXbiUZ8GgKRpOz9qD8UN0lcO7m
+NGsecvELPfJGPfE5T9+UkDHsQVV57RP3eqAxykC4Pv6GViPT4fuCCj25WpFbW9e4
+uuKFF3yVY3uJofPQGwLZ2b9WwujqgSyaozyKlTM4nPXwEEz56wPVuAsNfmTEtIb3
+N0d0uQpM69irH3sAO7nVDo6e/eP3Emq4kUDvhS04BafG+T7T9g0C74EGoJX5wrrk
+LzuEAkO84n6ESF6r+FI1XH4yskau3Jab8/x8f9sVj+ECgYEA9II7MZ2PSq2pHTsY
+1ZxZx3MKe/yiTMGkHhtQY6HKzzQXgEozK/uPTvMt7lKnBsseUydEXygMcgPXracF
+rFdiAQpD8Dq2jrmjtFcPk40DtLjdUUD4I2stTKprTfTrhx5X/JIX8iBflMTFWBYp
+ALM9qP0u3KZwVCGxEsGz5yaxtZkCgYEAw3Gj5eKw2pzRyNEdNsye3eQxp4QneM+X
+YozWzNrbGEdmJ1CHuMWXPTxAkxtMhH95QonySEP4R1fNxHJNMKPu7h2TiZiLvC/J
+UtE+SdETiEGF14SEfr/LflreTJnHCmK/pp19t1Q1cAn3FHws2D5qiA8eoBmnko6k
+irYydJn5dtUCgYBVOzRhJjg14vVJgDk29QqCsQJdmAIHWZTY/dJ2+IYW1mS+zp6p
+3UXmUnSXV+5rOtC2UcDOnso/0EEVglxC6C78h9SI4B6U//clvRdr6sL481wKn+gf
+iJPA3sMK6K5VamlnXJHGUCyhUjosa4Udfl2nE6KLPeV4Hkp4bFdG40EdOQKBgQCQ
+Y4dDUbt4dnyh0KO1lWwU3/4zFPYYUb00iHo0c8eDY1Q73Um3nvqBud63D2bzSD2s
+g78j1ls5Ucvpwsv2EFZ3QhB6ieFKET+52G4dGMJGWqnns7Yy8b0Dx1wN2Vnr+VI/
+ZIC5DRRBhossbiSvSUVo6Uql2u4q3wj+lWYnMI3VVQKBgQDs+sHMotTK976HKaRh
+sDepJnZwdnma1QBzsAXkZ0EJPqYCIFmbKGeXn/z2Fr62oGqe9suzuGLBYm4ukwoD
+xI8lDzxOoElFNaAHl6nIcFcj6I98idkU05NvV59aeLJngejJv3WmI2GH7jNK8dNs
+ELazMuTsmf+MdG/Q9C/kiHDvng==
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
new file mode 100644
index 000000000..c77dd6cd7
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
@@ -0,0 +1,62 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:c8
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:21:42 2017 GMT
+            Not After : Dec 19 02:21:42 2020 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (1024 bit)
+                Modulus (1024 bit):
+                    00:99:c1:1e:58:35:af:c1:38:38:45:8c:8c:f4:d9:
+                    6d:cc:ff:37:31:f9:ba:76:fa:fb:56:41:04:da:d2:
+                    a1:ea:a8:ca:6d:3b:b2:bf:4c:e7:55:ab:1c:a1:7e:
+                    d4:ec:54:d8:92:c6:f9:1f:e8:e8:d2:27:fa:4e:bb:
+                    e6:b2:21:59:bd:19:63:9f:4b:a1:3d:c0:25:d3:70:
+                    a4:9c:96:33:c6:53:c4:40:c1:de:a5:75:40:f7:db:
+                    51:f4:f6:19:9a:8d:a8:fa:0c:4b:fe:1f:11:70:23:
+                    31:76:c2:6c:41:6b:aa:c6:71:22:58:7b:4f:d8:2b:
+                    46:d6:e0:84:4d:57:e0:9c:09
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Subject Key Identifier: 
+                E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+                DirName:/C=US/ST=CA/O=Apache/OU=Apache Incubator/CN=New CA
+                serial:AC:A4:B3:6B:F5:B4:5F:C8
+
+            X509v3 Basic Constraints: 
+                CA:TRUE
+    Signature Algorithm: sha1WithRSAEncryption
+        7c:15:8d:92:14:c2:cf:b6:72:17:ba:ba:e0:7c:48:a0:fb:02:
+        86:b1:50:90:d0:b2:dd:40:9f:b5:e1:9e:ab:4a:bc:6c:f1:3e:
+        c3:7f:b5:b6:18:ab:f7:f0:a2:35:c6:5b:d7:2d:84:e1:d9:3d:
+        8c:88:c2:1c:44:61:a8:14:ab:b1:00:b4:00:a5:2d:66:43:86:
+        53:a2:d6:4a:73:96:b3:4f:63:b5:8d:8d:7f:e4:ff:82:37:81:
+        63:00:0e:d1:ef:59:0c:7c:2b:79:24:97:06:60:cd:a1:b3:37:
+        94:68:3d:6c:27:ee:8e:87:88:c1:21:0a:d5:04:66:11:06:11:
+        69:92
+-----BEGIN CERTIFICATE-----
+MIIC6DCCAlGgAwIBAgIJAKyks2v1tF/IMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMTQy
+WhcNMjAxMjE5MDIyMTQyWjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGTmV3IENBMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCZwR5YNa/BODhF
+jIz02W3M/zcx+bp2+vtWQQTa0qHqqMptO7K/TOdVqxyhftTsVNiSxvkf6OjSJ/pO
+u+ayIVm9GWOfS6E9wCXTcKScljPGU8RAwd6ldUD321H09hmajaj6DEv+HxFwIzF2
+wmxBa6rGcSJYe0/YK0bW4IRNV+CcCQIDAQABo4G7MIG4MB0GA1UdDgQWBBTlFcId
+5+4oPPq2Plj7C2FSbrCBWzCBiAYDVR0jBIGAMH6AFOUVwh3n7ig8+rY+WPsLYVJu
+sIFboVukWTBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzANBgNVBAoTBkFw
+YWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UEAxMGTmV3IENB
+ggkArKSza/W0X8gwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQB8FY2S
+FMLPtnIXurrgfEig+wKGsVCQ0LLdQJ+14Z6rSrxs8T7Df7W2GKv38KI1xlvXLYTh
+2T2MiMIcRGGoFKuxALQApS1mQ4ZTotZKc5azT2O1jY1/5P+CN4FjAA7R71kMfCt5
+JJcGYM2hszeUaD1sJ+6Oh4jBIQrVBGYRBhFpkg==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
new file mode 100644
index 000000000..741e10afa
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:ca
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:36:47 2017 GMT
+            Not After : Dec 20 02:36:47 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Client
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:fd:b6:bb:bc:a3:54:2b:06:b3:8e:68:31:e1:f3:
+                    3a:c6:3d:98:83:db:f8:fc:58:c6:35:47:4c:58:c1:
+                    40:81:71:8e:25:2c:6f:14:a0:5f:f2:85:97:fa:e5:
+                    d1:a6:65:26:3f:4b:52:f1:4a:11:1b:f6:af:22:fb:
+                    24:74:d7:d3:bd:c3:11:dc:7f:1e:49:96:19:4a:f5:
+                    9c:b3:4c:85:5d:33:57:08:43:04:3d:b0:69:1a:15:
+                    b3:08:c7:0d:68:09:02:09:37:90:1b:fa:51:e1:c9:
+                    6d:58:e3:d0:4e:e9:f9:a5:b5:4c:1a:5d:98:62:a2:
+                    d6:cd:a2:89:dc:91:52:c7:f5:19:53:97:5f:58:86:
+                    6b:5e:48:6c:81:8d:2f:5c:0e:38:96:d2:b7:f7:47:
+                    21:2e:54:2a:51:32:92:0d:f3:c3:94:f5:59:98:2c:
+                    11:1a:88:ad:ee:16:5c:72:6b:31:e3:bf:ca:9e:38:
+                    4b:49:d2:87:e1:44:69:ef:ba:4d:b9:1d:4b:3f:e0:
+                    c1:af:c5:04:6f:5f:2d:6e:d9:12:ac:bb:f1:f8:7f:
+                    fc:bd:3a:6a:99:e6:45:f9:91:98:c9:d1:b1:f0:d5:
+                    6a:e1:fd:c0:6e:e2:8e:ab:0c:03:87:ad:9c:26:9a:
+                    8e:93:4c:82:ec:de:25:49:14:91:ce:80:9f:22:17:
+                    aa:cf
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                B2:8F:75:E3:D7:7A:4C:62:B8:5C:04:66:A0:56:14:16:AF:82:43:5A
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        5f:e0:ec:f3:b4:bb:08:a6:15:85:f2:7d:c4:50:c4:87:e5:af:
+        1a:38:11:98:b1:a1:d6:47:85:f6:c6:80:cc:b3:2b:f6:27:8e:
+        24:1b:66:98:48:e7:d0:dd:cd:37:ea:a2:ad:cf:d8:a7:17:39:
+        59:be:72:a1:2a:24:f5:d6:23:14:b9:42:b4:2f:b1:cd:15:98:
+        d9:1a:8a:55:3c:f2:78:be:b4:ba:6b:79:3d:29:e8:54:4b:d8:
+        0f:1b:bd:69:ef:d2:ca:5c:0f:da:b4:b6:b8:cc:7f:b7:51:3c:
+        fc:3a:dd:6d:9c:3c:9e:71:ad:59:72:84:ac:01:6e:c5:66:8b:
+        b0:70
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/KMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIzNjQ3
+WhcNMTgxMjIwMDIzNjQ3WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA/ba7vKNU
+Kwazjmgx4fM6xj2Yg9v4/FjGNUdMWMFAgXGOJSxvFKBf8oWX+uXRpmUmP0tS8UoR
+G/avIvskdNfTvcMR3H8eSZYZSvWcs0yFXTNXCEMEPbBpGhWzCMcNaAkCCTeQG/pR
+4cltWOPQTun5pbVMGl2YYqLWzaKJ3JFSx/UZU5dfWIZrXkhsgY0vXA44ltK390ch
+LlQqUTKSDfPDlPVZmCwRGoit7hZccmsx47/KnjhLSdKH4URp77pNuR1LP+DBr8UE
+b18tbtkSrLvx+H/8vTpqmeZF+ZGYydGx8NVq4f3AbuKOqwwDh62cJpqOk0yC7N4l
+SRSRzoCfIheqzwIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUso9149d6TGK4
+XARmoFYUFq+CQ1owHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAX+Ds87S7CKYVhfJ9xFDEh+WvGjgRmLGh1keF9saAzLMr
+9ieOJBtmmEjn0N3NN+qirc/Ypxc5Wb5yoSok9dYjFLlCtC+xzRWY2RqKVTzyeL60
+umt5PSnoVEvYDxu9ae/SylwP2rS2uMx/t1E8/DrdbZw8nnGtWXKErAFuxWaLsHA=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
new file mode 100644
index 000000000..81d00f9ce
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD9tru8o1QrBrOO
+aDHh8zrGPZiD2/j8WMY1R0xYwUCBcY4lLG8UoF/yhZf65dGmZSY/S1LxShEb9q8i
++yR019O9wxHcfx5JlhlK9ZyzTIVdM1cIQwQ9sGkaFbMIxw1oCQIJN5Ab+lHhyW1Y
+49BO6fmltUwaXZhiotbNoonckVLH9RlTl19YhmteSGyBjS9cDjiW0rf3RyEuVCpR
+MpIN88OU9VmYLBEaiK3uFlxyazHjv8qeOEtJ0ofhRGnvuk25HUs/4MGvxQRvXy1u
+2RKsu/H4f/y9OmqZ5kX5kZjJ0bHw1Wrh/cBu4o6rDAOHrZwmmo6TTILs3iVJFJHO
+gJ8iF6rPAgMBAAECggEAEJmkLvOAzk/h769hlCcV8WKWWApMgDZOwa2okSYT0mRb
+qJL/sZnMrVGQYBopXXnAxuNmyeLOu8WoL+G+wOZeNExPHt4yXR41CXKIjjKzhyWU
+zDWWUXL5bXt9+1UKy4PLXk8EXtBCC0Pio65EMuWcL/tsv0zga5O7+jhoTMY1ZF/D
+rsddf2mIncyEdhwAKLREmFv31lY1k+Jd+5eyXHIJEnK8lMXTcORNsb0YtlS5sRTU
+4llwQlBXjV06zIVRFxsRcPrgRYH0Hfg3hSIm3epNE+pbj0tcN0CfQFfrKJ9G2cDS
+jXimjvGsPKQ1PRMAcg93qZB3VtI+ag9bZt29cru0AQKBgQD/xzXZP5hKoqOy+8qH
+HyPvyM0QCpQ6KwHzgf5ATybPIPlyWQmT2eeR3ez4qskowNvc4Fc/q10Ao+q6jC3E
+721Wz6+iCb7Qus37KnEqVW7mWDLsDT5q7vIyRR22wWhrTpu0uZmxd9XxYRU6KUe1
+FMkI5VijJ27NoYtO+gLn9u6J6QKBgQD97xCNVaUNMNRZ1+HOKoBqcGBj91KrL74K
+/avYL0EprYwzN1lm4ZmNX8GaBeAftwnIDyxaM3Apw8BcqFFz/IslY/5sCyUmVjgI
+ZULkhCBy5ZamFNMxLvaN6njtdpgdBRxR9gzke1V/xxJgN7J39h9FI+pElwMW6314
+6AFHYQ/j9wKBgQCNwfjEOQzMgKs9bXNnxAiEwsN0GojgXCmureMd/UBDF8FocJRw
+Txqaq2bEwtLONWUlW2i/rtfSnQZg8YQEW7Y7oMt0gPYydPXoODOUBNl77HH8hbKM
+TXYKCmhXe4XFw0FkvmDCDOqT5vx+yZYmdCifN40Sj65HZTryQHoP2bmG0QKBgG/U
+ntd/hka+4GYIuvsOoKs/flPIEfIt/mXcvZdhiDMQqRPNJmQ2qmcmap6oQ8Hz3Czs
+8b1vtc/O06J6xhRsfeMjnGJ8rgmqItcfsUvuHFQ9ZBEUTsX0RsTNJCCAABGXtJcr
+4xWkc0zooOEa5lAKZk8OuBco4kVvxDxBAH8s8dCVAoGAeEZICuDGR8cOV64Eyx2X
+Ej1PQJrleMmzCwth7UhREGUgEVglhMeoBxmWCukYxpkVBY0DUy6OWH5lpTfCerFZ
+ho1AHMt9DsfUWo4hApMXEMyCZTOJwg9M4vQ1UTbFtr0mt0jnVWTUm3mVxmJnfrtz
+/DgLrvcJd7QCGAYICMNxrDs=
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
new file mode 100644
index 000000000..8b524c82e
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:cb
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:45:24 2017 GMT
+            Not After : Dec 20 02:45:24 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Proxy
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:e1:e1:06:cc:f5:98:38:88:33:e0:f7:0a:5d:8e:
+                    a8:89:ae:8f:cd:c7:77:62:17:c2:a1:d8:fc:fc:d0:
+                    d0:86:f1:c8:3c:78:ec:b8:e9:73:1c:d1:72:55:97:
+                    c6:47:5a:4c:33:18:32:a1:9c:e1:84:2e:de:40:2f:
+                    a7:16:ed:a0:44:d6:4c:2c:04:ef:21:11:0b:6b:cb:
+                    36:8d:eb:5a:3d:a1:b6:9b:b5:23:be:bd:66:23:26:
+                    c9:82:62:44:51:f8:3a:94:07:6c:52:84:2c:d0:d9:
+                    24:8b:0a:f5:1b:c8:31:a2:29:4c:bc:b7:bf:96:e1:
+                    56:78:d2:75:08:c9:cb:0d:1a:1d:93:2d:bf:bf:86:
+                    10:06:d7:5c:b8:e6:99:05:89:6f:ad:3b:a6:37:45:
+                    15:3a:63:8b:d1:d6:0d:e4:d0:c6:06:c6:63:13:21:
+                    92:65:c1:1a:ae:1a:72:97:cf:86:ed:6f:a1:77:d8:
+                    18:67:f2:27:36:1f:ff:40:6e:57:97:90:5a:28:04:
+                    a4:a8:54:cf:a8:87:36:af:26:49:a6:4e:2d:d4:be:
+                    e6:17:e2:1a:da:c4:08:87:fd:3f:fe:7b:d8:1e:f2:
+                    66:0f:34:1a:02:5d:39:ec:66:3d:46:bc:37:ce:84:
+                    a2:51:0b:c8:72:f5:7c:5a:b8:1a:1b:0a:5d:2b:e9:
+                    56:4f
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                3F:A7:4A:6A:B1:6A:E1:51:8D:56:19:A2:2D:6A:A8:49:07:D6:87:8A
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        98:89:57:fd:96:0e:78:06:ce:9f:83:48:28:c9:34:a4:32:93:
+        d2:65:fb:2f:a9:39:51:ff:7a:89:57:26:6a:59:0d:81:09:20:
+        75:ae:c6:aa:f6:8c:d4:d2:7f:f0:78:88:df:74:90:28:11:15:
+        77:d3:60:3d:2d:d2:ef:34:1b:03:59:9f:23:1c:21:64:e5:b8:
+        a1:99:c3:08:82:31:3d:58:01:23:52:b8:96:c8:d5:42:b3:3b:
+        50:43:cc:7d:43:08:1d:c4:46:06:7f:c3:7f:3e:6d:01:f2:25:
+        91:4b:70:fd:0f:e3:25:a6:d4:d8:c9:f6:35:65:00:87:c7:03:
+        c2:d7
+-----BEGIN CERTIFICATE-----
+MIIDKjCCApOgAwIBAgIJAKyks2v1tF/LMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDI0NTI0
+WhcNMTgxMjIwMDI0NTI0WjBWMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEOMAwGA1UE
+AxMFUHJveHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDh4QbM9Zg4
+iDPg9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGE
+Lt5AL6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ
+2SSLCvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6
+Y4vR1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+o
+hzavJkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9Xxa
+uBobCl0r6VZPAgMBAAGjezB5MAkGA1UdEwQCMAAwLAYJYIZIAYb4QgENBB8WHU9w
+ZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRlMB0GA1UdDgQWBBQ/p0pqsWrhUY1W
+GaItaqhJB9aHijAfBgNVHSMEGDAWgBTlFcId5+4oPPq2Plj7C2FSbrCBWzANBgkq
+hkiG9w0BAQUFAAOBgQCYiVf9lg54Bs6fg0goyTSkMpPSZfsvqTlR/3qJVyZqWQ2B
+CSB1rsaq9ozU0n/weIjfdJAoERV302A9LdLvNBsDWZ8jHCFk5bihmcMIgjE9WAEj
+UriWyNVCsztQQ8x9QwgdxEYGf8N/Pm0B8iWRS3D9D+MlptTYyfY1ZQCHxwPC1w==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
new file mode 100644
index 000000000..9856807af
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDh4QbM9Zg4iDPg
+9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGELt5A
+L6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ2SSL
+CvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6Y4vR
+1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+ohzav
+JkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9XxauBob
+Cl0r6VZPAgMBAAECggEBAIXa6UHKhKNzq3K0UxMwOBYnORbUDp41wGRTB1D2maxu
+WZ/kdTv7M/ku8VdhsuGT1DYvL8nwAwBnGdPlqVoABYrlh4xKfD8XL7J4YWLmxrph
+O6q4RG+DI6TPFnlKrHv64xPX9kxMAZbeJzayjqAhGbCkUtI+/a126dx9s1c65jZj
+VyEDrfogOi3CUVHnTxZ3Yayy0gqldPAYdtt9p5YYyTxJYmuKqHBTh7FToX3RhyT0
+pZ4+IE7YV4HiBev2K8K6c4E2/UOZtkENCLy7DAQuQgokHYk0YeoG+tYfnBcIFkVD
+169Z766il027ILS8F7HMoBPQVYdf24YUgfQC3k8h8HECgYEA/VGEr3vFwxCUHtOK
+SKXCpFWpK0KvcYBQvgzLuKkTNbTWnezUwAugq+Ybao/hqsF5jEd9U8Iv35myHI8j
+EHHF9J8/zb1EcIZgTAPO4Uvc2rYxwt/c0kwy7F/FovVKg5yEscJ35iXQWFO5Yxyu
+rYU8yNVPBqXGCeUS1jJbryg1JZcCgYEA5EUmDfPHp6gWx9MmeuDqxvb2L/WHyxGb
+ojSsV5GFlCLa3QMKc1H/1+6lxLbMiGvtk2S1B9YeGWAvRB+10GSgn7AhiObxv20C
+8oqRtLPxO/eCCGOBnUiGTqKibFNyTVJ/+FgWpywQSUY8tk58fPBZvydE6XV0Wa6T
+1INerLxVnAkCgYAxkXn9PKL+AIh7X7l3bbggoAJyTKI3+3vRNH/IqozvvWshi+41
+hhDykhxbRbxKxYEbSgHkGeN0RYbsv7WEyj6KF39MqvRxcFn3hec9frLAuVYTY+q5
+2987EaKCuKzUBBSTFBKSHmQeZIOqOTqVCbVTNyo3isittv1wnHoEVEHSEQKBgQCM
+oQkjuVb8M/Ls4mmndB9Pul/LBhHFijB+isLOJAnOTHbXiAMNLqxWpGCdwxxYw10W
+3AknLcNXUMltx7dkDkpidskCJX0zuH4DXFkNoXnxvrbuYhc9Bawwj8NOx0340uWh
+4ur5zIywB8RpcAsDkbNIr3Gl/kVS5tmOJ+zQsCpxuQKBgQCKV6CDtKgGLgWvERUE
+Dei9pUx2uXtvThZomqoZqr+hZE3YmvtHZcLMK8sXJWDdkYVQ4bwDkmrSSkk5F9Nh
+PClfyOObFbOXLD0TrJZSJd/zrnmnWk8u4eE5XSwAQ+0XiO4LgQHDOutXpvW9ZVvT
+om8NGk5mEUz39XN0tuWzcN2FIQ==
+-----END PRIVATE KEY-----


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services