You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/01/31 00:09:39 UTC

[incubator-pulsar] branch master updated: Making Pulsar Proxy more secure (#1002)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dde5865  Making Pulsar Proxy more secure (#1002)
dde5865 is described below

commit dde58652aa441afb86e4d9b0541df8222b648ab0
Author: Jai Asher <ja...@ccs.neu.edu>
AuthorDate: Tue Jan 30 16:09:37 2018 -0800

    Making Pulsar Proxy more secure (#1002)
---
 .../broker/authorization/AuthorizationManager.java |  72 ++-
 .../pulsar/broker/admin/PersistentTopics.java      |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 635 ++++++++++++---------
 .../org/apache/pulsar/common/api/Commands.java     |  54 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 188 ++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |   9 +-
 .../pulsar/proxy/server/LookupProxyHandler.java    | 167 ++++--
 .../pulsar/proxy/server/ProxyConfiguration.java    |  22 +-
 .../apache/pulsar/proxy/server/ProxyService.java   |  31 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   9 +-
 .../ProxyAuthenticatedProducerConsumerTest.java    |   7 +-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   5 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   5 +-
 ...ava => ProxyWithProxyAuthorizationNegTest.java} | 127 +++--
 ...t.java => ProxyWithProxyAuthorizationTest.java} | 107 ++--
 ....java => ProxyWithoutServiceDiscoveryTest.java} |  35 +-
 .../broker-cert.pem                                |  72 +++
 .../ProxyWithProxyAuthorizationTest/broker-key.pem |  28 +
 .../tls/ProxyWithProxyAuthorizationTest/cacert.pem |  62 ++
 .../client-cert.pem                                |  72 +++
 .../ProxyWithProxyAuthorizationTest/client-key.pem |  28 +
 .../ProxyWithProxyAuthorizationTest/proxy-cert.pem |  72 +++
 .../ProxyWithProxyAuthorizationTest/proxy-key.pem  |  28 +
 23 files changed, 1349 insertions(+), 488 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
index 9fa31cc..2bf7ce6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.broker.authorization;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
@@ -66,8 +69,8 @@ public class AuthorizationManager {
             log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
             throw e;
         } catch (Exception e) {
-            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}", role,
-                    destination, e);
+            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
             throw e;
         }
     }
@@ -96,8 +99,9 @@ public class AuthorizationManager {
                         switch (policies.get().subscription_auth_mode) {
                         case Prefix:
                             if (!subscription.startsWith(role)) {
-                                PulsarServerException ex = new PulsarServerException(
-                                        String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
+                                PulsarServerException ex = new PulsarServerException(String.format(
+                                        "Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s",
+                                        role, destination));
                                 permissionFuture.completeExceptionally(ex);
                                 return;
                             }
@@ -111,13 +115,12 @@ public class AuthorizationManager {
                     permissionFuture.complete(isAuthorized);
                 });
             }).exceptionally(ex -> {
-                log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
-                        ex);
+                log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination, e);
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
@@ -130,8 +133,8 @@ public class AuthorizationManager {
             log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
             throw e;
         } catch (Exception e) {
-            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}", role,
-                    destination, e);
+            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
             throw e;
         }
     }
@@ -150,8 +153,46 @@ public class AuthorizationManager {
         return canProduce(destination, role) || canConsume(destination, role, null);
     }
 
-    private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
-            AuthAction action) {
+    /**
+     * Check whether the specified role can perform a lookup for the specified destination.
+     *
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param destination
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
+        CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
+        canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
+            if (ex == null) {
+                if (produceAuthorized) {
+                    finalResult.complete(produceAuthorized);
+                    return;
+                }
+            } else if (log.isDebugEnabled()) {
+                log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
+                        destination.toString(), role, ex.getMessage());
+            }
+            canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
+                if (e == null) {
+                    if (consumeAuthorized) {
+                        finalResult.complete(consumeAuthorized);
+                        return;
+                    }
+                } else if (log.isDebugEnabled()) {
+                    log.debug(
+                            "Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
+                            destination.toString(), role, e.getMessage());
+                }
+                finalResult.complete(false);
+            });
+        });
+        return finalResult;
+    }
+
+    private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
         if (isSuperUser(role)) {
             return CompletableFuture.completedFuture(true);
         } else {
@@ -218,13 +259,13 @@ public class AuthorizationManager {
                 }
                 permissionFuture.complete(false);
             }).exceptionally(ex -> {
-                log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination,
-                        ex);
+                log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+                        ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}", role, destination, e);
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
@@ -244,8 +285,7 @@ public class AuthorizationManager {
             }
 
             // Suffix match
-            if (permittedRole.charAt(0) == '*'
-                    && checkedRole.endsWith(permittedRole.substring(1))
+            if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
                     && permittedActions.contains(checkedAction)) {
                 return true;
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index b7361b0..f241cf5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -1314,7 +1314,7 @@ public class PersistentTopics extends AdminResource {
                     validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
-                    throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
+                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
                             clientAppId, dn.toString(), authException.getMessage()));
                 }
             } catch (Exception ex) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bd49fc7..8888c97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,7 +100,7 @@ public class ServerCnx extends PulsarHandler {
     private String clientVersion = null;
     private int nonPersistentPendingMessages = 0;
     private final int MaxNonPersistentPendingMessages;
-    private String originalPrincipal;
+    private String originalPrincipal = null;
 
     enum State {
         Start, Connected, Failed
@@ -187,73 +187,128 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
-        final String topic = lookup.getTopic();
+        final String topicName = lookup.getTopic();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
+            log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId);
         }
+
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
-                    getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
-                        if (ex == null) {
-                            ctx.writeAndFlush(lookupResponse);
-                        } else {
-                            // it should never happen
-                            log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
-                            ctx.writeAndFlush(
-                                    newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
-                        }
-                        lookupSemaphore.release();
-                        return null;
-                    });
+            final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
+                    : this.originalPrincipal;
+            CompletableFuture<Boolean> isProxyAuthorizedFuture;
+            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationManager()
+                        .canLookupAsync(DestinationName.get(topicName), authRole);
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            
+            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+                if (isProxyAuthorized) {
+                    lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topicName),
+                            lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
+                            lookup.getRequestId()).handle((lookupResponse, ex) -> {
+                                if (ex == null) {
+                                    ctx.writeAndFlush(lookupResponse);
+                                } else {
+                                    // it should never happen
+                                    log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topicName,
+                                            ex.getMessage(), ex);
+                                    ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+                                            ex.getMessage(), requestId));
+                                }
+                                lookupSemaphore.release();
+                                return null;
+                            });
+                } else {
+                    final String msg = "Proxy Client is not authorized to Lookup";
+                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                    ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                final String msg = "Exception occured while trying to authorize lookup";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                lookupSemaphore.release();
+                return null;
+            });
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic);
+                log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topicName);
             }
             ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
                     "Failed due to too many pending lookup requests", requestId));
         }
-
     }
 
     @Override
     protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
         final long requestId = partitionMetadata.getRequestId();
-        final String topic = partitionMetadata.getTopic();
+        final String topicName = partitionMetadata.getTopic();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId);
+            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topicName, remoteAddress, requestId);
         }
         final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
         if (lookupSemaphore.tryAcquire()) {
-            getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                    originalPrincipal != null ? originalPrincipal : authRole, DestinationName.get(topic))
-                            .handle((metadata, ex) -> {
-                                if (ex == null) {
-                                    int partitions = metadata.partitions;
-                                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
-                                } else {
-                                    if (ex instanceof PulsarClientException) {
-                                        log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
-                                                remoteAddress, topic, ex.getMessage());
-                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
-                                                ServerError.AuthorizationError, ex.getMessage(), requestId));
+
+            final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
+                    ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
+            CompletableFuture<Boolean> isProxyAuthorizedFuture;
+            if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+                isProxyAuthorizedFuture = service.getAuthorizationManager()
+                        .canLookupAsync(DestinationName.get(topicName), authRole);
+            } else {
+                isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            }
+            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+                    if (isProxyAuthorized) {
+                        getPartitionedTopicMetadata(getBrokerService().pulsar(),
+                                originalPrincipal != null ? originalPrincipal : authRole,
+                                DestinationName.get(topicName)).handle((metadata, ex) -> {
+                                    if (ex == null) {
+                                        int partitions = metadata.partitions;
+                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
                                     } else {
-                                        log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
-                                                ex.getMessage(), ex);
-                                        ServerError error = (ex instanceof RestException)
-                                                && ((RestException) ex).getResponse().getStatus() < 500
-                                                        ? ServerError.MetadataError : ServerError.ServiceNotReady;
-                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error, ex.getMessage(),
-                                                requestId));
+                                        if (ex instanceof PulsarClientException) {
+                                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
+                                                    remoteAddress, topicName, ex.getMessage());
+                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+                                                    ServerError.AuthorizationError, ex.getMessage(), requestId));
+                                        } else {
+                                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
+                                                    topicName, ex.getMessage(), ex);
+                                            ServerError error = (ex instanceof RestException)
+                                                    && ((RestException) ex).getResponse().getStatus() < 500
+                                                            ? ServerError.MetadataError : ServerError.ServiceNotReady;
+                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+                                                    ex.getMessage(), requestId));
+                                        }
                                     }
-                                }
-                                lookupSemaphore.release();
-                                return null;
-                            });
+                                    lookupSemaphore.release();
+                                    return null;
+                                });
+                    } else {
+                        final String msg = "Proxy Client is not authorized to Get Partition Metadata";
+                        log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                        ctx.writeAndFlush(
+                                Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+                        lookupSemaphore.release();
+                    }
+                    return null;
+            }).exceptionally(ex -> {
+                final String msg = "Exception occured while trying to authorize get Partition Metadata";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, requestId));
+                lookupSemaphore.release();
+                return null;
+            });
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress,
-                        topic);
+                        topicName);
             }
             ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.TooManyRequests,
                     "Failed due to too many pending lookup requests", requestId));
@@ -330,11 +385,12 @@ public class ServerCnx extends PulsarHandler {
                 if (sslHandler != null) {
                     sslSession = ((SslHandler) sslHandler).engine().getSession();
                 }
+
                 originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
                 authRole = getBrokerService().getAuthenticationService()
                         .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
 
-                log.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
+                log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
             } catch (AuthenticationException e) {
                 String msg = "Unable to authenticate";
                 log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
@@ -358,119 +414,140 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleSubscribe(final CommandSubscribe subscribe) {
         checkArgument(state == State.Connected);
-        CompletableFuture<Boolean> authorizationFuture;
-        if (service.isAuthorizationEnabled()) {
-            authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
-                    DestinationName.get(subscribe.getTopic()),
-                    originalPrincipal != null ? originalPrincipal : authRole,
-                    subscribe.getSubscription());
-        } else {
-            authorizationFuture = CompletableFuture.completedFuture(true);
-        }
         final String topicName = subscribe.getTopic();
-        final String subscriptionName = subscribe.getSubscription();
         final long requestId = subscribe.getRequestId();
         final long consumerId = subscribe.getConsumerId();
-        final SubType subType = subscribe.getSubType();
-        final String consumerName = subscribe.getConsumerName();
-        final boolean isDurable = subscribe.getDurable();
-        final MessageIdImpl startMessageId = subscribe.hasStartMessageId()
-                ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(),
-                        subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(),
-                        subscribe.getStartMessageId().getBatchIndex())
-                : null;
-
-        final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
-        final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
-
-        authorizationFuture.thenApply(isAuthorized -> {
-            if (isAuthorized) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
-                }
-
-                log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
 
-                try {
-                    Metadata.validateMetadata(metadata);
-                } catch (IllegalArgumentException iae) {
-                    final String msg = iae.getMessage();
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                    return null;
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
+                    authRole, subscribe.getSubscription());
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+            if (isProxyAuthorized) {
+                CompletableFuture<Boolean> authorizationFuture;
+                if (service.isAuthorizationEnabled()) {
+                    authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
+                            DestinationName.get(subscribe.getTopic()),
+                            originalPrincipal != null ? originalPrincipal : authRole, subscribe.getSubscription());
+                } else {
+                    authorizationFuture = CompletableFuture.completedFuture(true);
                 }
 
-                CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
-                CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
-
-                if (existingConsumerFuture != null) {
-                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
-                        Consumer consumer = existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer);
-                        ctx.writeAndFlush(Commands.newSuccess(requestId));
-                        return null;
-                    } else {
-                        // There was an early request to create a consumer with same consumerId. This can happen when
-                        // client timeout is lower the broker timeouts. We need to wait until the previous consumer
-                        // creation request either complete or fails.
-                        log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName,
-                                subscriptionName);
-                        ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
-                                : getErrorCode(existingConsumerFuture);
-                        ctx.writeAndFlush(
-                                Commands.newError(requestId, error, "Consumer is already present on the connection"));
-                        return null;
-                    }
-                }
+                final String subscriptionName = subscribe.getSubscription();
+                final SubType subType = subscribe.getSubType();
+                final String consumerName = subscribe.getConsumerName();
+                final boolean isDurable = subscribe.getDurable();
+                final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
+                        subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
+                        subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
+                        : null;
+
+                final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
+                final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
+
+                authorizationFuture.thenApply(isAuthorized -> {
+                    if (isAuthorized) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
+                        }
 
-                service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName,
-                        consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
-                        .thenAccept(consumer -> {
-                            if (consumerFuture.complete(consumer)) {
-                                log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
-                                        subscriptionName);
-                                ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+                        log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
+                        try {
+                            Metadata.validateMetadata(metadata);
+                        } catch (IllegalArgumentException iae) {
+                            final String msg = iae.getMessage();
+                            ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+                            return null;
+                        }
+                        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+                        CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
+                                consumerFuture);
+
+                        if (existingConsumerFuture != null) {
+                            if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
+                                Consumer consumer = existingConsumerFuture.getNow(null);
+                                log.info("[{}] Consumer with the same id is already created: {}", remoteAddress,
+                                        consumer);
+                                ctx.writeAndFlush(Commands.newSuccess(requestId));
+                                return null;
                             } else {
-                                // The consumer future was completed before by a close command
-                                try {
-                                    consumer.close();
-                                    log.info("[{}] Cleared consumer created after timeout on client side {}",
-                                            remoteAddress, consumer);
-                                } catch (BrokerServiceException e) {
-                                    log.warn("[{}] Error closing consumer created after timeout on client side {}: {}",
-                                            remoteAddress, consumer, e.getMessage());
-                                }
-                                consumers.remove(consumerId, consumerFuture);
+                                // There was an early request to create a consumer with same consumerId. This can happen
+                                // when
+                                // client timeout is lower the broker timeouts. We need to wait until the previous
+                                // consumer
+                                // creation request either complete or fails.
+                                log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress,
+                                        topicName, subscriptionName);
+                                ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady
+                                        : getErrorCode(existingConsumerFuture);
+                                ctx.writeAndFlush(Commands.newError(requestId, error,
+                                        "Consumer is already present on the connection"));
+                                return null;
                             }
+                        }
 
-                        }) //
-                        .exceptionally(exception -> {
-                            if (exception.getCause() instanceof ConsumerBusyException) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug(
-                                            "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
-                                            remoteAddress, topicName, subscriptionName,
-                                            exception.getCause().getMessage());
-                                }
-                            } else {
-                                log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                        subscriptionName, exception.getCause().getMessage(), exception);
-                            }
+                        service.getTopic(topicName)
+                                .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
+                                        subType, priorityLevel, consumerName, isDurable, startMessageId, metadata))
+                                .thenAccept(consumer -> {
+                                    if (consumerFuture.complete(consumer)) {
+                                        log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
+                                                subscriptionName);
+                                        ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
+                                    } else {
+                                        // The consumer future was completed before by a close command
+                                        try {
+                                            consumer.close();
+                                            log.info("[{}] Cleared consumer created after timeout on client side {}",
+                                                    remoteAddress, consumer);
+                                        } catch (BrokerServiceException e) {
+                                            log.warn(
+                                                    "[{}] Error closing consumer created after timeout on client side {}: {}",
+                                                    remoteAddress, consumer, e.getMessage());
+                                        }
+                                        consumers.remove(consumerId, consumerFuture);
+                                    }
 
-                            // If client timed out, the future would have been completed by subsequent close. Send error
-                            // back to client, only if not completed already.
-                            if (consumerFuture.completeExceptionally(exception)) {
-                                ctx.writeAndFlush(Commands.newError(requestId,
-                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
-                                        exception.getCause().getMessage()));
-                            }
-                            consumers.remove(consumerId, consumerFuture);
+                                }) //
+                                .exceptionally(exception -> {
+                                    if (exception.getCause() instanceof ConsumerBusyException) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(
+                                                    "[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}",
+                                                    remoteAddress, topicName, subscriptionName,
+                                                    exception.getCause().getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
+                                                subscriptionName, exception.getCause().getMessage(), exception);
+                                    }
 
-                            return null;
+                                    // If client timed out, the future would have been completed by subsequent close.
+                                    // Send error
+                                    // back to client, only if not completed already.
+                                    if (consumerFuture.completeExceptionally(exception)) {
+                                        ctx.writeAndFlush(Commands.newError(requestId,
+                                                BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                                exception.getCause().getMessage()));
+                                    }
+                                    consumers.remove(consumerId, consumerFuture);
 
-                        });
+                                    return null;
+
+                                });
+                    } else {
+                        String msg = "Client is not authorized to subscribe";
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+                    }
+                    return null;
+                });
             } else {
-                String msg = "Client is not authorized to subscribe";
-                log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                final String msg = "Proxy Client is not authorized to subscribe";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
                 ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
             }
             return null;
@@ -489,155 +566,171 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleProducer(final CommandProducer cmdProducer) {
         checkArgument(state == State.Connected);
-        CompletableFuture<Boolean> authorizationFuture;
-        if (service.isAuthorizationEnabled()) {
-            authorizationFuture = service.getAuthorizationManager().canProduceAsync(
-                    DestinationName.get(cmdProducer.getTopic()),
-                    originalPrincipal != null ? originalPrincipal : authRole);
-        } else {
-            authorizationFuture = CompletableFuture.completedFuture(true);
-        }
-
-        // Use producer name provided by client if present
-        final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
-                : service.generateUniqueProducerName();
         final String topicName = cmdProducer.getTopic();
         final long producerId = cmdProducer.getProducerId();
         final long requestId = cmdProducer.getRequestId();
-        final boolean isEncrypted = cmdProducer.getEncrypted();
-        final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
 
-        authorizationFuture.thenApply(isAuthorized -> {
-            if (isAuthorized) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
-                }
-
-                try {
-                    Metadata.validateMetadata(metadata);
-                } catch (IllegalArgumentException iae) {
-                    final String msg = iae.getMessage();
-                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                    return null;
-                }
-
-                CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
-                CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
-
-                if (existingProducerFuture != null) {
-                    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
-                        Producer producer = existingProducerFuture.getNow(null);
-                        log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer);
-                        ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
-                        return null;
-                    } else {
-                        // There was an early request to create a producer with
-                        // same producerId. This can happen when
-                        // client
-                        // timeout is lower the broker timeouts. We need to wait
-                        // until the previous producer creation
-                        // request
-                        // either complete or fails.
-                        ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
-                                : getErrorCode(existingProducerFuture);
-                        log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
-                        ctx.writeAndFlush(
-                                Commands.newError(requestId, error, "Producer is already present on the connection"));
-                        return null;
-                    }
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
+            isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
+                    authRole);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
+            if (isProxyAuthorized) {
+                CompletableFuture<Boolean> authorizationFuture;
+                if (service.isAuthorizationEnabled()) {
+                    authorizationFuture = service.getAuthorizationManager().canProduceAsync(
+                            DestinationName.get(cmdProducer.getTopic().toString()),
+                            originalPrincipal != null ? originalPrincipal : authRole);
+                } else {
+                    authorizationFuture = CompletableFuture.completedFuture(true);
                 }
+                // Use producer name provided by client if present
+                final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
+                        : service.generateUniqueProducerName();
+                final boolean isEncrypted = cmdProducer.getEncrypted();
+                final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
+
+                authorizationFuture.thenApply(isAuthorized -> {
+                    if (isAuthorized) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
+                        }
+                        CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
+                        CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
+                                producerFuture);
+
+                        if (existingProducerFuture != null) {
+                            if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
+                                Producer producer = existingProducerFuture.getNow(null);
+                                log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
+                                        producer);
+                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
+                                return null;
+                            } else {
+                                // There was an early request to create a producer with
+                                // same producerId. This can happen when
+                                // client
+                                // timeout is lower the broker timeouts. We need to wait
+                                // until the previous producer creation
+                                // request
+                                // either complete or fails.
+                                ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
+                                        : getErrorCode(existingProducerFuture);
+                                log.warn("[{}][{}] Producer is already present on the connection", remoteAddress,
+                                        topicName);
+                                ctx.writeAndFlush(Commands.newError(requestId, error,
+                                        "Producer is already present on the connection"));
+                                return null;
+                            }
+                        }
 
-                log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
-
-                service.getTopic(topicName).thenAccept((Topic topic) -> {
-                    // Before creating producer, check if backlog quota exceeded
-                    // on topic
-                    if (topic.isBacklogQuotaExceeded(producerName)) {
-                        IllegalStateException illegalStateException = new IllegalStateException(
-                                "Cannot create producer on topic with backlog quota exceeded");
-                        BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
-                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
-                            ctx.writeAndFlush(Commands.newError(requestId,
-                                    ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage()));
-                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
-                            ctx.writeAndFlush(
-                                    Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException,
+                        log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
+
+                        service.getTopic(topicName).thenAccept((Topic topic) -> {
+                            // Before creating producer, check if backlog quota exceeded
+                            // on topic
+                            if (topic.isBacklogQuotaExceeded(producerName)) {
+                                IllegalStateException illegalStateException = new IllegalStateException(
+                                        "Cannot create producer on topic with backlog quota exceeded");
+                                BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
+                                if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
+                                    ctx.writeAndFlush(
+                                            Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError,
+                                                    illegalStateException.getMessage()));
+                                } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
+                                    ctx.writeAndFlush(Commands.newError(requestId,
+                                            ServerError.ProducerBlockedQuotaExceededException,
                                             illegalStateException.getMessage()));
-                        }
-                        producerFuture.completeExceptionally(illegalStateException);
-                        producers.remove(producerId, producerFuture);
-                        return;
-                    }
+                                }
+                                producerFuture.completeExceptionally(illegalStateException);
+                                producers.remove(producerId, producerFuture);
+                                return;
+                            }
 
-                    // Check whether the producer will publish encrypted messages or not
-                    if (topic.isEncryptionRequired() && !isEncrypted) {
-                        String msg = String.format("Encryption is required in %s", topicName);
-                        log.warn("[{}] {}", remoteAddress, msg);
-                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
-                        return;
-                    }
+                            // Check whether the producer will publish encrypted messages or not
+                            if (topic.isEncryptionRequired() && !isEncrypted) {
+                                String msg = String.format("Encryption is required in %s", topicName);
+                                log.warn("[{}] {}", remoteAddress, msg);
+                                ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, msg));
+                                return;
+                            }
 
-                    disableTcpNoDelayIfNeeded(topicName, producerName);
+                            disableTcpNoDelayIfNeeded(topicName, producerName);
 
-                    Producer producer =
-                            new Producer(topic, ServerCnx.this, producerId, producerName, authRole, isEncrypted, metadata);
+                            Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+                                    isEncrypted, metadata);
 
-                    try {
-                        topic.addProducer(producer);
+                            try {
+                                topic.addProducer(producer);
 
-                        if (isActive()) {
-                            if (producerFuture.complete(producer)) {
-                                log.info("[{}] Created new producer: {}", remoteAddress, producer);
-                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
-                                        producer.getLastSequenceId()));
-                                return;
-                            } else {
-                                // The producer's future was completed before by
-                                // a close command
-                                producer.closeNow();
-                                log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress,
-                                        producer);
+                                if (isActive()) {
+                                    if (producerFuture.complete(producer)) {
+                                        log.info("[{}] Created new producer: {}", remoteAddress, producer);
+                                        ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+                                                producer.getLastSequenceId()));
+                                        return;
+                                    } else {
+                                        // The producer's future was completed before by
+                                        // a close command
+                                        producer.closeNow();
+                                        log.info("[{}] Cleared producer created after timeout on client side {}",
+                                                remoteAddress, producer);
+                                    }
+                                } else {
+                                    producer.closeNow();
+                                    log.info("[{}] Cleared producer created after connection was closed: {}",
+                                            remoteAddress, producer);
+                                    producerFuture.completeExceptionally(
+                                            new IllegalStateException("Producer created after connection was closed"));
+                                }
+                            } catch (BrokerServiceException ise) {
+                                log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
+                                        ise.getMessage());
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
+                                producerFuture.completeExceptionally(ise);
                             }
-                        } else {
-                            producer.closeNow();
-                            log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress,
-                                    producer);
-                            producerFuture.completeExceptionally(
-                                    new IllegalStateException("Producer created after connection was closed"));
-                        }
-                    } catch (BrokerServiceException ise) {
-                        log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
-                                ise.getMessage());
-                        ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise),
-                                ise.getMessage()));
-                        producerFuture.completeExceptionally(ise);
-                    }
 
-                    producers.remove(producerId, producerFuture);
-                }).exceptionally(exception -> {
-                    Throwable cause = exception.getCause();
-                    if (!(cause instanceof ServiceUnitNotReadyException)) {
-                        // Do not print stack traces for expected exceptions
-                        log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
-                    }
+                            producers.remove(producerId, producerFuture);
+                        }).exceptionally(exception -> {
+                            Throwable cause = exception.getCause();
+                            if (!(cause instanceof ServiceUnitNotReadyException)) {
+                                // Do not print stack traces for expected exceptions
+                                log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+                            }
 
-                    // If client timed out, the future would have been completed
-                    // by subsequent close. Send error back to
-                    // client, only if not completed already.
-                    if (producerFuture.completeExceptionally(exception)) {
-                        ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause),
-                                cause.getMessage()));
-                    }
-                    producers.remove(producerId, producerFuture);
+                            // If client timed out, the future would have been completed
+                            // by subsequent close. Send error back to
+                            // client, only if not completed already.
+                            if (producerFuture.completeExceptionally(exception)) {
+                                ctx.writeAndFlush(Commands.newError(requestId,
+                                        BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+                            }
+                            producers.remove(producerId, producerFuture);
 
+                            return null;
+                        });
+                    } else {
+                        String msg = "Client is not authorized to Produce";
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
+                    }
                     return null;
                 });
             } else {
-                String msg = "Client is not authorized to Produce";
-                log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
+                final String msg = "Proxy Client is not authorized to Produce";
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
                 ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
             }
             return null;
+        }).exceptionally(ex -> {
+            String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
+            log.warn(msg);
+            ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
+            return null;
         });
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index e7b670a..7d847ee 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -449,16 +449,7 @@ public class Commands {
     }
 
     public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
-        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
-        partitionMetadataBuilder.setTopic(topic);
-        partitionMetadataBuilder.setRequestId(requestId);
-
-        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
-        ByteBuf res = serializeWithSize(
-                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
-        partitionMetadataBuilder.recycle();
-        partitionMetadata.recycle();
-        return res;
+        return Commands.newPartitionMetadataRequest(topic, requestId, null);
     }
 
     public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
@@ -477,16 +468,7 @@ public class Commands {
     }
 
     public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
-        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
-        lookupTopicBuilder.setTopic(topic);
-        lookupTopicBuilder.setRequestId(requestId);
-        lookupTopicBuilder.setAuthoritative(authoritative);
-
-        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
-        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
-        lookupTopicBuilder.recycle();
-        lookupBroker.recycle();
-        return res;
+        return Commands.newLookup(topic, authoritative, null, requestId);
     }
 
     public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
@@ -640,7 +622,7 @@ public class Commands {
     }
 
     static ByteBuf newPong() {
-    	return cmdPong.retainedDuplicate();
+        return cmdPong.retainedDuplicate();
     }
 
     private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) {
@@ -891,4 +873,34 @@ public class Commands {
         Crc32c,
         None;
     }
+
+    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String clientAuthRole) {
+        CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
+        partitionMetadataBuilder.setTopic(topic);
+        partitionMetadataBuilder.setRequestId(requestId);
+        if (clientAuthRole != null) { 
+            partitionMetadataBuilder.setOriginalPrincipal(clientAuthRole);
+        }
+        CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
+        ByteBuf res = serializeWithSize(
+                BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
+        partitionMetadataBuilder.recycle();
+        partitionMetadata.recycle();
+        return res;
+    }
+
+    public static ByteBuf newLookup(String topic, boolean authoritative, String clientAuthRole, long requestId) {
+        CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
+        lookupTopicBuilder.setTopic(topic);
+        lookupTopicBuilder.setRequestId(requestId);
+        lookupTopicBuilder.setAuthoritative(authoritative);
+        if (clientAuthRole != null) {
+            lookupTopicBuilder.setOriginalPrincipal(clientAuthRole);
+        }
+        CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
+        lookupTopicBuilder.recycle();
+        lookupBroker.recycle();
+        return res;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 6e344cb..eef3651 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6779,6 +6779,10 @@ public final class PulsarApi {
     // required uint64 request_id = 2;
     boolean hasRequestId();
     long getRequestId();
+    
+    // optional string original_principal = 3;
+    boolean hasOriginalPrincipal();
+    String getOriginalPrincipal();
   }
   public static final class CommandPartitionedTopicMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -6859,9 +6863,42 @@ public final class PulsarApi {
       return requestId_;
     }
     
+    // optional string original_principal = 3;
+    public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 3;
+    private java.lang.Object originalPrincipal_;
+    public boolean hasOriginalPrincipal() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getOriginalPrincipal() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalPrincipal_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalPrincipal_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
+      originalPrincipal_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6894,6 +6931,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeUInt64(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getOriginalPrincipalBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6910,6 +6950,10 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getOriginalPrincipalBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7027,6 +7071,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000001);
         requestId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
+        originalPrincipal_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -7068,6 +7114,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000002;
         }
         result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.originalPrincipal_ = originalPrincipal_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7080,6 +7130,9 @@ public final class PulsarApi {
         if (other.hasRequestId()) {
           setRequestId(other.getRequestId());
         }
+        if (other.hasOriginalPrincipal()) {
+          setOriginalPrincipal(other.getOriginalPrincipal());
+        }
         return this;
       }
       
@@ -7127,6 +7180,11 @@ public final class PulsarApi {
               requestId_ = input.readUInt64();
               break;
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              originalPrincipal_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -7190,6 +7248,42 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional string original_principal = 3;
+      private java.lang.Object originalPrincipal_ = "";
+      public boolean hasOriginalPrincipal() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getOriginalPrincipal() {
+        java.lang.Object ref = originalPrincipal_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalPrincipal_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalPrincipal(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        originalPrincipal_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalPrincipal() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+        
+        return this;
+      }
+      void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000004;
+        originalPrincipal_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandPartitionedTopicMetadata)
     }
     
@@ -7865,6 +7959,10 @@ public final class PulsarApi {
     // optional bool authoritative = 3 [default = false];
     boolean hasAuthoritative();
     boolean getAuthoritative();
+    
+    // optional string original_principal = 4;
+    boolean hasOriginalPrincipal();
+    String getOriginalPrincipal();
   }
   public static final class CommandLookupTopic extends
       com.google.protobuf.GeneratedMessageLite
@@ -7955,10 +8053,43 @@ public final class PulsarApi {
       return authoritative_;
     }
     
+    // optional string original_principal = 4;
+    public static final int ORIGINAL_PRINCIPAL_FIELD_NUMBER = 4;
+    private java.lang.Object originalPrincipal_;
+    public boolean hasOriginalPrincipal() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public String getOriginalPrincipal() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          originalPrincipal_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getOriginalPrincipalBytes() {
+      java.lang.Object ref = originalPrincipal_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        originalPrincipal_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
     private void initFields() {
       topic_ = "";
       requestId_ = 0L;
       authoritative_ = false;
+      originalPrincipal_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7994,6 +8125,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBool(3, authoritative_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getOriginalPrincipalBytes());
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -8014,6 +8148,10 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(3, authoritative_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getOriginalPrincipalBytes());
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -8133,6 +8271,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000002);
         authoritative_ = false;
         bitField0_ = (bitField0_ & ~0x00000004);
+        originalPrincipal_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -8178,6 +8318,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000004;
         }
         result.authoritative_ = authoritative_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.originalPrincipal_ = originalPrincipal_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -8193,6 +8337,9 @@ public final class PulsarApi {
         if (other.hasAuthoritative()) {
           setAuthoritative(other.getAuthoritative());
         }
+        if (other.hasOriginalPrincipal()) {
+          setOriginalPrincipal(other.getOriginalPrincipal());
+        }
         return this;
       }
       
@@ -8245,6 +8392,11 @@ public final class PulsarApi {
               authoritative_ = input.readBool();
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              originalPrincipal_ = input.readBytes();
+              break;
+            }
           }
         }
       }
@@ -8329,6 +8481,42 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional string original_principal = 4;
+      private java.lang.Object originalPrincipal_ = "";
+      public boolean hasOriginalPrincipal() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public String getOriginalPrincipal() {
+        java.lang.Object ref = originalPrincipal_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          originalPrincipal_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setOriginalPrincipal(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        originalPrincipal_ = value;
+        
+        return this;
+      }
+      public Builder clearOriginalPrincipal() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        originalPrincipal_ = getDefaultInstance().getOriginalPrincipal();
+        
+        return this;
+      }
+      void setOriginalPrincipal(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000008;
+        originalPrincipal_ = value;
+        
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandLookupTopic)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 6c86530..faa894f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -72,9 +72,9 @@ message MessageMetadata {
 	// differentiate single and batch message metadata
 	optional int32 num_messages_in_batch = 11 [default = 1];
 
-        // the timestamp that this event occurs. it is typically set by applications.
-        // if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
-        optional uint64 event_time = 12 [default = 0];
+	// the timestamp that this event occurs. it is typically set by applications.
+	// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
+	optional uint64 event_time = 12 [default = 0];
 	// Contains encryption key name, encrypted key and metadata to describe the key
 	repeated EncryptionKeys encryption_keys = 13;
 	// Algorithm used to encrypt data key
@@ -186,6 +186,7 @@ message CommandSubscribe {
 message CommandPartitionedTopicMetadata {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
+	optional string original_principal = 3;
 }
 
 message CommandPartitionedTopicMetadataResponse {
@@ -198,13 +199,13 @@ message CommandPartitionedTopicMetadataResponse {
 	optional LookupType response          = 3;
 	optional ServerError error            = 4;
 	optional string message               = 5;
-
 }
 
 message CommandLookupTopic {
 	required string topic            = 1;
 	required uint64 request_id       = 2;
 	optional bool authoritative      = 3 [default = false];
+	optional string original_principal = 4;
 }
 
 message CommandLookupTopicResponse {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 9dc876f..5d35b86 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -33,7 +33,9 @@ import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
 import io.prometheus.client.Counter;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
     private final ProxyService service;
@@ -41,6 +43,7 @@ public class LookupProxyHandler {
     private final boolean connectWithTLS;
 
     private SocketAddress clientAddress;
+    private String brokerServiceURL;
 
     private static final Counter lookupRequests = Counter
             .build("pulsar_proxy_lookup_requests", "Counter of topic lookup requests").create().register();
@@ -54,6 +57,8 @@ public class LookupProxyHandler {
         this.proxyConnection = proxyConnection;
         this.clientAddress = proxyConnection.clientAddress();
         this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
+        this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS()
+                : proxy.getConfiguration().getBrokerServiceURL();
     }
 
     public void handleLookup(CommandLookupTopic lookup) {
@@ -64,20 +69,24 @@ public class LookupProxyHandler {
         lookupRequests.inc();
         long clientRequestId = lookup.getRequestId();
         String topic = lookup.getTopic();
-
-        ServiceLookupData availableBroker = null;
-        try {
-            availableBroker = service.getDiscoveryProvider().nextBroker();
-        } catch (Exception e) {
-            log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
-            proxyConnection.ctx().writeAndFlush(
-                    Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
-            return;
+        String serviceUrl;
+        if (isBlank(brokerServiceURL)) {
+            ServiceLookupData availableBroker = null;
+            try {
+                availableBroker = service.getDiscoveryProvider().nextBroker();
+            } catch (Exception e) {
+                log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
+                proxyConnection.ctx().writeAndFlush(
+                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), clientRequestId));
+                return;
+            }
+            serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
+                    : availableBroker.getPulsarServiceUrl();
+        } else {
+            serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
+                    : service.getConfiguration().getBrokerServiceURL();
         }
-
-        performLookup(clientRequestId, topic,
-                this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl(),
-                false, 10);
+        performLookup(clientRequestId, topic, serviceUrl, false, 10);
     }
 
     private void performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative,
@@ -99,31 +108,41 @@ public class LookupProxyHandler {
 
         InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
         if (log.isDebugEnabled()) {
-            log.debug("Getting connections to '{}'", addr);
+            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, topic,
+                    clientRequestId);
         }
         service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
             // Connected to backend broker
             long requestId = service.newRequestId();
-            clientCnx.newLookup(Commands.newLookup(topic, authoritative, requestId), requestId).thenAccept(result -> {
-                if (result.redirect) {
-                    // Need to try the lookup again on a different broker
-                    performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
-                } else {
-                    // We have the result immediately
-                    String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
-
-                    // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS between proxy
-                    // and broker is independent of whether the client itself uses TLS, but we need to force the client
-                    // to use the appropriate target broker (and port) when it will connect back.
-                    proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
-                            LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
-                }
-            }).exceptionally(ex -> {
-                log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
-                proxyConnection.ctx().writeAndFlush(
-                        Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
-                return null;
-            });
+            ByteBuf command;
+            if (service.getConfiguration().isAuthenticationEnabled()) {
+                command = Commands.newLookup(topic, authoritative, proxyConnection.clientAuthRole, requestId);
+            } else {
+                command = Commands.newLookup(topic, authoritative, requestId);
+            }
+            clientCnx.newLookup(command,
+                    requestId).thenAccept(result -> {
+                        if (result.redirect) {
+                            // Need to try the lookup again on a different broker
+                            performLookup(clientRequestId, topic, result.brokerUrl, authoritative, numberOfRetries - 1);
+                        } else {
+                            // We have the result immediately
+                            String brokerUrl = connectWithTLS ? result.brokerUrlTls : result.brokerUrl;
+
+                            // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
+                            // between proxy
+                            // and broker is independent of whether the client itself uses TLS, but we need to force the
+                            // client
+                            // to use the appropriate target broker (and port) when it will connect back.
+                            proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, brokerUrl, true,
+                                    LookupType.Connect, clientRequestId, true /* this is coming from proxy */));
+                        }
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to lookup topic {}: {}", clientAddress, topic, ex.getMessage());
+                        proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+                                ex.getMessage(), clientRequestId));
+                        return null;
+                    });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
             proxyConnection.ctx().writeAndFlush(
@@ -132,30 +151,74 @@ public class LookupProxyHandler {
         });
     }
 
-    void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
+    public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
         partitionsMetadataRequests.inc();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
         }
 
-        final long requestId = partitionMetadata.getRequestId();
+        final long clientRequestId = partitionMetadata.getRequestId();
         DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
-
-        service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
-                .thenAccept(metadata -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Total number of partitions for topic {} is {}", proxyConnection.clientAuthRole,
-                                dn, metadata.partitions);
-                    }
-                    proxyConnection.ctx()
-                            .writeAndFlush(Commands.newPartitionMetadataResponse(metadata.partitions, requestId));
-                }).exceptionally(ex -> {
-                    log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
-                            ex.getMessage(), ex);
-                    proxyConnection.ctx().writeAndFlush(Commands
-                            .newPartitionMetadataResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
-                    return null;
-                });
+        if (isBlank(brokerServiceURL)) {
+            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
+                    .thenAccept(metadata -> {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Total number of partitions for topic {} is {}",
+                                    proxyConnection.clientAuthRole, dn, metadata.partitions);
+                        }
+                        proxyConnection.ctx().writeAndFlush(
+                                Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, dn,
+                                ex.getMessage(), ex);
+                        proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
+                                ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+                        return null;
+                    });
+        } else {
+            URI brokerURI;
+            try {
+                brokerURI = new URI(brokerServiceURL);
+            } catch (URISyntaxException e) {
+                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
+                        e.getMessage(), clientRequestId));
+                return;
+            }
+            InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
+
+            if (log.isDebugEnabled()) {
+                log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
+                        dn.getPartitionedTopicName(), clientRequestId);
+            }
+
+            service.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
+                // Connected to backend broker
+                long requestId = service.newRequestId();
+                ByteBuf command;
+                if (service.getConfiguration().isAuthenticationEnabled()) {
+                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId, proxyConnection.clientAuthRole);
+                } else {
+                    command = Commands.newPartitionMetadataRequest(dn.toString(), requestId);
+                }
+                clientCnx.newLookup(
+                        command,
+                        requestId).thenAccept(lookupDataResult -> {
+                            proxyConnection.ctx().writeAndFlush(Commands
+                                    .newPartitionMetadataResponse(lookupDataResult.partitions, clientRequestId));
+                        }).exceptionally((ex) -> {
+                            log.warn("[{}] failed to get Partitioned metadata : {}", dn.toString(),
+                                    ex.getCause().getMessage(), ex);
+                            proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(
+                                    ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
+                            return null;
+                        });
+            }).exceptionally(ex -> {
+                // Failed to connect to backend broker
+                proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+                        ex.getMessage(), clientRequestId));
+                return null;
+            });
+        }
     }
 
     private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 0d0f160..1c45c4f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -34,7 +34,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
 
     // ZooKeeper session timeout
     private int zookeeperSessionTimeoutMs = 30_000;
-
+    
+    // if Service Discovery is Disabled this url should point to the discovery service provider. 
+    private String brokerServiceURL;
+    private String brokerServiceURLTLS;
+    
     // Port to use to server binary-proto request
     private int servicePort = 6650;
     // Port to use to server binary-proto-tls request
@@ -78,6 +82,22 @@ public class ProxyConfiguration implements PulsarConfiguration {
 
     private Properties properties = new Properties();
 
+    public String getBrokerServiceURLTLS() {
+        return brokerServiceURLTLS;
+    }
+    
+    public void setBrokerServiceURLTLS(String discoveryServiceURLTLS) {
+        this.brokerServiceURLTLS = discoveryServiceURLTLS;
+    }
+    
+    public String getBrokerServiceURL() {
+        return brokerServiceURL;
+    }
+    
+    public void setBrokerServiceURL(String discoveryServiceURL) {
+        this.brokerServiceURL = discoveryServiceURL;
+    }
+    
     public String getZookeeperServers() {
         return zookeeperServers;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index fed6e15..e5c2e91 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -110,21 +111,23 @@ public class ProxyService implements Closeable {
     }
 
     public void start() throws Exception {
-        localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
-                proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
-        localZooKeeperConnectionService.start(new ShutdownService() {
-            @Override
-            public void shutdown(int exitCode) {
-                LOG.error("Lost local ZK session. Shutting down the proxy");
-                Runtime.getRuntime().halt(-1);
-            }
-        });
-
-        discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
-        this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
         ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig);
         authenticationService = new AuthenticationService(serviceConfiguration);
-        authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+
+        if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) {
+            localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
+                    proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
+            localZooKeeperConnectionService.start(new ShutdownService() {
+                @Override
+                public void shutdown(int exitCode) {
+                    LOG.error("Lost local ZK session. Shutting down the proxy");
+                    Runtime.getRuntime().halt(-1);
+                }
+            });
+            discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
+            this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
+            authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+        }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
         bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
@@ -145,7 +148,7 @@ public class ProxyService implements Closeable {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
             tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
             tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync();
-            LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getWebServicePortTls());
+            LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getServicePortTls());
         }
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index b7e9394..d9fc4af 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -88,8 +88,13 @@ public class ProxyServiceStarter {
             config.setGlobalZookeeperServers(globalZookeeperServers);
         }
 
-        checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
-        checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+        if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
+                || config.isAuthorizationEnabled()) {
+            checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+            checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+        }
+
+        java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
 
         // create broker service
         ProxyService discoveryService = new ProxyService(config);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 556a8b5..c62bbc1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -61,7 +61,8 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
     private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+    
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
     private final String configClusterName = "use";
@@ -117,10 +118,12 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setBrokerClientAuthenticationParameters(
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
+        
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
 
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
-
         proxyService.start();
     }
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 17696c7..05d79ef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -40,6 +40,8 @@ import org.testng.annotations.Test;
 
 public class ProxyTest extends MockedPulsarServiceBaseTest {
 
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
+
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
@@ -49,6 +51,9 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(PortManager.nextFreePort());
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 89426d5..a87ef09 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -43,7 +43,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
     private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
     private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
-    
+    private final String DUMMY_VALUE = "DUMMY_VALUE";
     
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -61,6 +61,9 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setTlsEnabledWithBroker(false);
         proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
         proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setZookeeperServers(DUMMY_VALUE);
+        proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+        
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
         doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
similarity index 57%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
index 556a8b5..04717ce 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationNegTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 import java.util.HashSet;
@@ -37,8 +36,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
@@ -53,18 +54,24 @@ import org.testng.collections.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
-    private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
-
-    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
-    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
-    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
-    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
-    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+public class ProxyWithProxyAuthorizationNegTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationNegTest.class);
+
+    private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+    private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+    private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+    private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
-    private final String configClusterName = "use";
 
     @BeforeMethod
     @Override
@@ -75,32 +82,33 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         conf.setAuthorizationEnabled(true);
 
         conf.setTlsEnabled(true);
-        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         conf.setTlsAllowInsecureConnection(true);
 
         Set<String> superUserRoles = new HashSet<>();
-        superUserRoles.add("localhost");
         superUserRoles.add("superUser");
         conf.setSuperUserRoles(superUserRoles);
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName(configClusterName);
+        conf.setClusterName("proxy-authorization-neg");
 
         super.init();
 
         // start proxy service
         proxyConfig.setAuthenticationEnabled(true);
-        proxyConfig.setAuthenticationEnabled(true);
-
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
         proxyConfig.setServicePort(PortManager.nextFreePort());
         proxyConfig.setServicePortTls(PortManager.nextFreePort());
         proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -109,17 +117,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setTlsEnabledWithBroker(true);
 
         // enable tls and auth&auth at proxy
-        proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
 
         proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
-
+ 
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
-        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
         proxyService.start();
     }
@@ -146,32 +153,46 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
      * @throws Exception
      */
     @Test
-    public void testTlsSyncProducerAndConsumer() throws Exception {
+    public void testProxyAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
+        createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
-        Map<String, String> authParams = Maps.newHashMap();
-        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
-        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
-        Authentication authTls = new AuthenticationTls();
-        authTls.configure(authParams);
         // create a client which connects to proxy over tls and pass authData
-        PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
 
-        admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+        String namespaceName = "my-property/proxy-authorization-neg/my-ns";
+        
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization-neg")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 
+        
         ConsumerConfiguration conf = new ConsumerConfiguration();
         conf.setSubscriptionType(SubscriptionType.Exclusive);
-        Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
-                conf);
-
+        Consumer consumer;
+        try {
+            consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+                    conf);
+        } catch (Exception ex) {
+            // expected
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume));
+            log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
+            consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", "my-subscriber-name",
+                    conf);
+        }
         ProducerConfiguration producerConf = new ProducerConfiguration();
-
-        Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+        Producer producer;
+        try {
+            producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+        } catch(Exception ex) {
+            // expected
+            admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce, AuthAction.consume));
+            producer = proxyClient.createProducer("persistent://my-property/proxy-authorization-neg/my-ns/my-topic1", producerConf);
+        }
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
             String message = "my-message-" + i;
@@ -196,16 +217,34 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         log.info("-- Exiting {} test --", methodName);
     }
 
-    protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
+    protected final void createAdminClient() throws Exception {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
         org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
         clientConf.setStatsInterval(0, TimeUnit.SECONDS);
-        clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
         clientConf.setTlsAllowInsecureConnection(true);
-        clientConf.setAuthentication(auth);
+        clientConf.setAuthentication(authTls);
         clientConf.setUseTls(true);
 
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        return PulsarClient.create(lookupUrl, clientConf);
     }
-
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
similarity index 63%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
index 556a8b5..558f5e0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,8 +38,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
@@ -53,18 +56,24 @@ import org.testng.collections.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
-    private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
-
-    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
-    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
-    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
-    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
-    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
-
+public class ProxyWithProxyAuthorizationTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithProxyAuthorizationTest.class);
+
+    private final String TLS_PROXY_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem";
+    private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem";
+    private final String TLS_SERVER_CERT_TRUST_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem";
+    private final String TLS_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_SUPERUSER_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
-    private final String configClusterName = "use";
 
     @BeforeMethod
     @Override
@@ -75,32 +84,33 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         conf.setAuthorizationEnabled(true);
 
         conf.setTlsEnabled(true);
-        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_TRUST_FILE_PATH);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         conf.setTlsAllowInsecureConnection(true);
 
         Set<String> superUserRoles = new HashSet<>();
-        superUserRoles.add("localhost");
         superUserRoles.add("superUser");
         conf.setSuperUserRoles(superUserRoles);
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_SERVER_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName(configClusterName);
+        conf.setClusterName("proxy-authorization");
 
         super.init();
 
         // start proxy service
         proxyConfig.setAuthenticationEnabled(true);
-        proxyConfig.setAuthenticationEnabled(true);
-
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
         proxyConfig.setServicePort(PortManager.nextFreePort());
         proxyConfig.setServicePortTls(PortManager.nextFreePort());
         proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -109,17 +119,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setTlsEnabledWithBroker(true);
 
         // enable tls and auth&auth at proxy
-        proxyConfig.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        proxyConfig.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        proxyConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+        proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);
 
         proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         proxyConfig.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
-
+ 
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
-        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
         proxyService.start();
     }
@@ -146,32 +155,32 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
      * @throws Exception
      */
     @Test
-    public void testTlsSyncProducerAndConsumer() throws Exception {
+    public void textProxyAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
+        createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
-        Map<String, String> authParams = Maps.newHashMap();
-        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
-        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
-        Authentication authTls = new AuthenticationTls();
-        authTls.configure(authParams);
         // create a client which connects to proxy over tls and pass authData
-        PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
 
-        admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+        
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+        
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 
+        
         ConsumerConfiguration conf = new ConsumerConfiguration();
         conf.setSubscriptionType(SubscriptionType.Exclusive);
-        Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+        Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1", "my-subscriber-name",
                 conf);
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
 
-        Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+        Producer producer = proxyClient.createProducer("persistent://my-property/proxy-authorization/my-ns/my-topic1", producerConf);
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
             String message = "my-message-" + i;
@@ -196,16 +205,34 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         log.info("-- Exiting {} test --", methodName);
     }
 
-    protected final PulsarClient createPulsarClient(Authentication auth, String lookupUrl) throws Exception {
+    protected final void createAdminClient() throws Exception {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_SUPERUSER_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
         org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
         clientConf.setStatsInterval(0, TimeUnit.SECONDS);
-        clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsTrustCertsFilePath(TLS_SUPERUSER_CLIENT_TRUST_CERT_FILE_PATH);
         clientConf.setTlsAllowInsecureConnection(true);
-        clientConf.setAuthentication(auth);
+        clientConf.setAuthentication(authTls);
         clientConf.setUseTls(true);
 
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        return PulsarClient.create(lookupUrl, clientConf);
     }
-
+    
+    private PulsarClient createPulsarClient(String proxyServiceUrl) throws PulsarClientException {
+        Map<String, String> authParams = Maps.newHashMap();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        return PulsarClient.create(proxyServiceUrl, clientConf);
+    }
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
similarity index 89%
copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index 556a8b5..6b47d2d 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 import java.util.HashSet;
@@ -44,7 +43,6 @@ import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -53,8 +51,10 @@ import org.testng.collections.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
-    private static final Logger log = LoggerFactory.getLogger(ProxyAuthenticatedProducerConsumerTest.class);
+import org.testng.Assert;
+
+public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
 
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
     private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
@@ -64,7 +64,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
-    private final String configClusterName = "use";
 
     @BeforeMethod
     @Override
@@ -72,7 +71,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         
         // enable tls and auth&auth at broker 
         conf.setAuthenticationEnabled(true);
-        conf.setAuthorizationEnabled(true);
+        conf.setAuthorizationEnabled(false);
 
         conf.setTlsEnabled(true);
         conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
@@ -81,7 +80,6 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         conf.setTlsAllowInsecureConnection(true);
 
         Set<String> superUserRoles = new HashSet<>();
-        superUserRoles.add("localhost");
         superUserRoles.add("superUser");
         conf.setSuperUserRoles(superUserRoles);
 
@@ -93,14 +91,16 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName(configClusterName);
+        conf.setClusterName("without-service-discovery");
 
         super.init();
 
         // start proxy service
         proxyConfig.setAuthenticationEnabled(true);
-        proxyConfig.setAuthenticationEnabled(true);
-
+        proxyConfig.setAuthorizationEnabled(false);
+        proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
+        proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);
+        
         proxyConfig.setServicePort(PortManager.nextFreePort());
         proxyConfig.setServicePortTls(PortManager.nextFreePort());
         proxyConfig.setWebServicePort(PortManager.nextFreePort());
@@ -117,9 +117,8 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setBrokerClientAuthenticationParameters(
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         proxyConfig.setAuthenticationProviders(providers);
-
+ 
         proxyService = Mockito.spy(new ProxyService(proxyConfig));
-        doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
 
         proxyService.start();
     }
@@ -146,7 +145,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
      * @throws Exception
      */
     @Test
-    public void testTlsSyncProducerAndConsumer() throws Exception {
+    public void testDiscoveryService() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
@@ -158,20 +157,18 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         // create a client which connects to proxy over tls and pass authData
         PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
 
-        admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
-                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("without-service-discovery")));
+        admin.namespaces().createNamespace("my-property/without-service-discovery/my-ns");
 
         ConsumerConfiguration conf = new ConsumerConfiguration();
         conf.setSubscriptionType(SubscriptionType.Exclusive);
-        Consumer consumer = proxyClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+        Consumer consumer = proxyClient.subscribe("persistent://my-property/without-service-discovery/my-ns/my-topic1", "my-subscriber-name",
                 conf);
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
 
-        Producer producer = proxyClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+        Producer producer = proxyClient.createProducer("persistent://my-property/without-service-discovery/my-ns/my-topic1", producerConf);
         final int msgs = 10;
         for (int i = 0; i < msgs; i++) {
             String message = "my-message-" + i;
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
new file mode 100644
index 0000000..63fcf38
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:c9
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:22:54 2017 GMT
+            Not After : Dec 20 02:22:54 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Broker
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:ba:ab:bd:1d:68:9e:1f:6d:99:8a:8e:95:8d:dc:
+                    b7:e5:95:1a:40:ff:9e:5d:be:38:e6:19:1c:39:0d:
+                    39:e3:e0:cd:96:42:09:41:9f:ca:f1:7f:63:6f:be:
+                    a5:46:1b:07:06:01:43:11:ed:e9:f9:a2:41:2a:29:
+                    ac:10:d3:df:30:4a:f5:9b:5d:b9:97:2b:d4:10:82:
+                    92:55:e7:ca:b1:eb:94:6a:63:e6:28:a3:75:0e:f2:
+                    5b:ff:1a:df:0b:3e:2d:6b:c8:c1:49:98:2b:c1:5f:
+                    9a:c6:1d:94:26:7f:eb:6f:7e:81:c2:27:23:13:90:
+                    4f:89:04:dd:2c:8d:de:4c:f8:9f:33:b9:28:ed:7e:
+                    3a:14:fa:6f:d0:cc:50:5e:75:40:39:e2:57:46:af:
+                    b7:67:8f:c9:57:f2:85:b0:54:59:02:76:c8:92:2c:
+                    af:19:3e:09:d8:5f:a4:d0:9c:a7:35:77:c9:aa:90:
+                    50:86:2a:9a:3c:8f:3b:50:a5:01:88:b9:d3:eb:4d:
+                    23:24:f2:58:65:1c:03:7a:0a:2c:20:30:b6:46:8d:
+                    b1:65:1c:16:0c:bf:bd:87:df:1c:e6:46:c8:f7:4f:
+                    60:fd:a1:91:c9:e4:ff:21:e7:e8:65:70:ba:9f:d6:
+                    44:07:27:45:1d:69:e7:d6:72:d8:d0:3e:df:2e:61:
+                    9e:4d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                1C:C6:F7:DB:06:C1:1D:1C:7C:9E:64:AF:E5:47:47:80:00:6C:C8:26
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        7f:b4:f8:d6:9c:ea:01:1b:74:19:a9:ee:ea:83:66:11:df:90:
+        c5:f0:e6:bc:05:bd:b4:8a:64:d6:08:fd:75:da:2e:f5:f9:20:
+        e0:62:8b:b8:b7:bd:c3:92:0f:a3:61:c7:78:6a:68:ea:74:20:
+        8e:a8:b7:0f:28:d1:54:8a:55:af:38:8c:a7:64:79:1c:95:f6:
+        b8:f3:48:0e:14:2b:78:75:ff:96:70:85:28:30:1f:fa:94:a9:
+        43:cd:98:6e:7b:80:68:bc:08:cc:35:1d:df:34:df:3d:58:52:
+        c3:5d:55:65:b6:be:ef:a2:78:a0:3c:41:c8:af:9f:74:e6:d8:
+        0a:d3
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/JMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMjU0
+WhcNMTgxMjIwMDIyMjU0WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQnJva2VyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuqu9HWie
+H22Zio6Vjdy35ZUaQP+eXb445hkcOQ054+DNlkIJQZ/K8X9jb76lRhsHBgFDEe3p
++aJBKimsENPfMEr1m125lyvUEIKSVefKseuUamPmKKN1DvJb/xrfCz4ta8jBSZgr
+wV+axh2UJn/rb36BwicjE5BPiQTdLI3eTPifM7ko7X46FPpv0MxQXnVAOeJXRq+3
+Z4/JV/KFsFRZAnbIkiyvGT4J2F+k0JynNXfJqpBQhiqaPI87UKUBiLnT600jJPJY
+ZRwDegosIDC2Ro2xZRwWDL+9h98c5kbI909g/aGRyeT/IefoZXC6n9ZEBydFHWnn
+1nLY0D7fLmGeTQIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUHMb32wbBHRx8
+nmSv5UdHgABsyCYwHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAf7T41pzqARt0Ganu6oNmEd+QxfDmvAW9tIpk1gj9ddou
+9fkg4GKLuLe9w5IPo2HHeGpo6nQgjqi3DyjRVIpVrziMp2R5HJX2uPNIDhQreHX/
+lnCFKDAf+pSpQ82YbnuAaLwIzDUd3zTfPVhSw11VZba+76J4oDxByK+fdObYCtM=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
new file mode 100644
index 0000000..8e47938
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC6q70daJ4fbZmK
+jpWN3LfllRpA/55dvjjmGRw5DTnj4M2WQglBn8rxf2NvvqVGGwcGAUMR7en5okEq
+KawQ098wSvWbXbmXK9QQgpJV58qx65RqY+Yoo3UO8lv/Gt8LPi1ryMFJmCvBX5rG
+HZQmf+tvfoHCJyMTkE+JBN0sjd5M+J8zuSjtfjoU+m/QzFBedUA54ldGr7dnj8lX
+8oWwVFkCdsiSLK8ZPgnYX6TQnKc1d8mqkFCGKpo8jztQpQGIudPrTSMk8lhlHAN6
+CiwgMLZGjbFlHBYMv72H3xzmRsj3T2D9oZHJ5P8h5+hlcLqf1kQHJ0UdaefWctjQ
+Pt8uYZ5NAgMBAAECggEBAIY3Tx1jCDYOppQiGtPKPAr9XsgXQrWiPOTsbwdyRApd
+q1P7HQ6rJs7mygcha1HxwuYFaETu7AkKKZJ4LfhXbiUZ8GgKRpOz9qD8UN0lcO7m
+NGsecvELPfJGPfE5T9+UkDHsQVV57RP3eqAxykC4Pv6GViPT4fuCCj25WpFbW9e4
+uuKFF3yVY3uJofPQGwLZ2b9WwujqgSyaozyKlTM4nPXwEEz56wPVuAsNfmTEtIb3
+N0d0uQpM69irH3sAO7nVDo6e/eP3Emq4kUDvhS04BafG+T7T9g0C74EGoJX5wrrk
+LzuEAkO84n6ESF6r+FI1XH4yskau3Jab8/x8f9sVj+ECgYEA9II7MZ2PSq2pHTsY
+1ZxZx3MKe/yiTMGkHhtQY6HKzzQXgEozK/uPTvMt7lKnBsseUydEXygMcgPXracF
+rFdiAQpD8Dq2jrmjtFcPk40DtLjdUUD4I2stTKprTfTrhx5X/JIX8iBflMTFWBYp
+ALM9qP0u3KZwVCGxEsGz5yaxtZkCgYEAw3Gj5eKw2pzRyNEdNsye3eQxp4QneM+X
+YozWzNrbGEdmJ1CHuMWXPTxAkxtMhH95QonySEP4R1fNxHJNMKPu7h2TiZiLvC/J
+UtE+SdETiEGF14SEfr/LflreTJnHCmK/pp19t1Q1cAn3FHws2D5qiA8eoBmnko6k
+irYydJn5dtUCgYBVOzRhJjg14vVJgDk29QqCsQJdmAIHWZTY/dJ2+IYW1mS+zp6p
+3UXmUnSXV+5rOtC2UcDOnso/0EEVglxC6C78h9SI4B6U//clvRdr6sL481wKn+gf
+iJPA3sMK6K5VamlnXJHGUCyhUjosa4Udfl2nE6KLPeV4Hkp4bFdG40EdOQKBgQCQ
+Y4dDUbt4dnyh0KO1lWwU3/4zFPYYUb00iHo0c8eDY1Q73Um3nvqBud63D2bzSD2s
+g78j1ls5Ucvpwsv2EFZ3QhB6ieFKET+52G4dGMJGWqnns7Yy8b0Dx1wN2Vnr+VI/
+ZIC5DRRBhossbiSvSUVo6Uql2u4q3wj+lWYnMI3VVQKBgQDs+sHMotTK976HKaRh
+sDepJnZwdnma1QBzsAXkZ0EJPqYCIFmbKGeXn/z2Fr62oGqe9suzuGLBYm4ukwoD
+xI8lDzxOoElFNaAHl6nIcFcj6I98idkU05NvV59aeLJngejJv3WmI2GH7jNK8dNs
+ELazMuTsmf+MdG/Q9C/kiHDvng==
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
new file mode 100644
index 0000000..c77dd6c
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/cacert.pem
@@ -0,0 +1,62 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:c8
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:21:42 2017 GMT
+            Not After : Dec 19 02:21:42 2020 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (1024 bit)
+                Modulus (1024 bit):
+                    00:99:c1:1e:58:35:af:c1:38:38:45:8c:8c:f4:d9:
+                    6d:cc:ff:37:31:f9:ba:76:fa:fb:56:41:04:da:d2:
+                    a1:ea:a8:ca:6d:3b:b2:bf:4c:e7:55:ab:1c:a1:7e:
+                    d4:ec:54:d8:92:c6:f9:1f:e8:e8:d2:27:fa:4e:bb:
+                    e6:b2:21:59:bd:19:63:9f:4b:a1:3d:c0:25:d3:70:
+                    a4:9c:96:33:c6:53:c4:40:c1:de:a5:75:40:f7:db:
+                    51:f4:f6:19:9a:8d:a8:fa:0c:4b:fe:1f:11:70:23:
+                    31:76:c2:6c:41:6b:aa:c6:71:22:58:7b:4f:d8:2b:
+                    46:d6:e0:84:4d:57:e0:9c:09
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Subject Key Identifier: 
+                E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+                DirName:/C=US/ST=CA/O=Apache/OU=Apache Incubator/CN=New CA
+                serial:AC:A4:B3:6B:F5:B4:5F:C8
+
+            X509v3 Basic Constraints: 
+                CA:TRUE
+    Signature Algorithm: sha1WithRSAEncryption
+        7c:15:8d:92:14:c2:cf:b6:72:17:ba:ba:e0:7c:48:a0:fb:02:
+        86:b1:50:90:d0:b2:dd:40:9f:b5:e1:9e:ab:4a:bc:6c:f1:3e:
+        c3:7f:b5:b6:18:ab:f7:f0:a2:35:c6:5b:d7:2d:84:e1:d9:3d:
+        8c:88:c2:1c:44:61:a8:14:ab:b1:00:b4:00:a5:2d:66:43:86:
+        53:a2:d6:4a:73:96:b3:4f:63:b5:8d:8d:7f:e4:ff:82:37:81:
+        63:00:0e:d1:ef:59:0c:7c:2b:79:24:97:06:60:cd:a1:b3:37:
+        94:68:3d:6c:27:ee:8e:87:88:c1:21:0a:d5:04:66:11:06:11:
+        69:92
+-----BEGIN CERTIFICATE-----
+MIIC6DCCAlGgAwIBAgIJAKyks2v1tF/IMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIyMTQy
+WhcNMjAxMjE5MDIyMTQyWjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGTmV3IENBMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCZwR5YNa/BODhF
+jIz02W3M/zcx+bp2+vtWQQTa0qHqqMptO7K/TOdVqxyhftTsVNiSxvkf6OjSJ/pO
+u+ayIVm9GWOfS6E9wCXTcKScljPGU8RAwd6ldUD321H09hmajaj6DEv+HxFwIzF2
+wmxBa6rGcSJYe0/YK0bW4IRNV+CcCQIDAQABo4G7MIG4MB0GA1UdDgQWBBTlFcId
+5+4oPPq2Plj7C2FSbrCBWzCBiAYDVR0jBIGAMH6AFOUVwh3n7ig8+rY+WPsLYVJu
+sIFboVukWTBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzANBgNVBAoTBkFw
+YWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UEAxMGTmV3IENB
+ggkArKSza/W0X8gwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQB8FY2S
+FMLPtnIXurrgfEig+wKGsVCQ0LLdQJ+14Z6rSrxs8T7Df7W2GKv38KI1xlvXLYTh
+2T2MiMIcRGGoFKuxALQApS1mQ4ZTotZKc5azT2O1jY1/5P+CN4FjAA7R71kMfCt5
+JJcGYM2hszeUaD1sJ+6Oh4jBIQrVBGYRBhFpkg==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
new file mode 100644
index 0000000..741e10a
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:ca
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:36:47 2017 GMT
+            Not After : Dec 20 02:36:47 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Client
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:fd:b6:bb:bc:a3:54:2b:06:b3:8e:68:31:e1:f3:
+                    3a:c6:3d:98:83:db:f8:fc:58:c6:35:47:4c:58:c1:
+                    40:81:71:8e:25:2c:6f:14:a0:5f:f2:85:97:fa:e5:
+                    d1:a6:65:26:3f:4b:52:f1:4a:11:1b:f6:af:22:fb:
+                    24:74:d7:d3:bd:c3:11:dc:7f:1e:49:96:19:4a:f5:
+                    9c:b3:4c:85:5d:33:57:08:43:04:3d:b0:69:1a:15:
+                    b3:08:c7:0d:68:09:02:09:37:90:1b:fa:51:e1:c9:
+                    6d:58:e3:d0:4e:e9:f9:a5:b5:4c:1a:5d:98:62:a2:
+                    d6:cd:a2:89:dc:91:52:c7:f5:19:53:97:5f:58:86:
+                    6b:5e:48:6c:81:8d:2f:5c:0e:38:96:d2:b7:f7:47:
+                    21:2e:54:2a:51:32:92:0d:f3:c3:94:f5:59:98:2c:
+                    11:1a:88:ad:ee:16:5c:72:6b:31:e3:bf:ca:9e:38:
+                    4b:49:d2:87:e1:44:69:ef:ba:4d:b9:1d:4b:3f:e0:
+                    c1:af:c5:04:6f:5f:2d:6e:d9:12:ac:bb:f1:f8:7f:
+                    fc:bd:3a:6a:99:e6:45:f9:91:98:c9:d1:b1:f0:d5:
+                    6a:e1:fd:c0:6e:e2:8e:ab:0c:03:87:ad:9c:26:9a:
+                    8e:93:4c:82:ec:de:25:49:14:91:ce:80:9f:22:17:
+                    aa:cf
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                B2:8F:75:E3:D7:7A:4C:62:B8:5C:04:66:A0:56:14:16:AF:82:43:5A
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        5f:e0:ec:f3:b4:bb:08:a6:15:85:f2:7d:c4:50:c4:87:e5:af:
+        1a:38:11:98:b1:a1:d6:47:85:f6:c6:80:cc:b3:2b:f6:27:8e:
+        24:1b:66:98:48:e7:d0:dd:cd:37:ea:a2:ad:cf:d8:a7:17:39:
+        59:be:72:a1:2a:24:f5:d6:23:14:b9:42:b4:2f:b1:cd:15:98:
+        d9:1a:8a:55:3c:f2:78:be:b4:ba:6b:79:3d:29:e8:54:4b:d8:
+        0f:1b:bd:69:ef:d2:ca:5c:0f:da:b4:b6:b8:cc:7f:b7:51:3c:
+        fc:3a:dd:6d:9c:3c:9e:71:ad:59:72:84:ac:01:6e:c5:66:8b:
+        b0:70
+-----BEGIN CERTIFICATE-----
+MIIDKzCCApSgAwIBAgIJAKyks2v1tF/KMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDIzNjQ3
+WhcNMTgxMjIwMDIzNjQ3WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEPMA0GA1UE
+AxMGQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA/ba7vKNU
+Kwazjmgx4fM6xj2Yg9v4/FjGNUdMWMFAgXGOJSxvFKBf8oWX+uXRpmUmP0tS8UoR
+G/avIvskdNfTvcMR3H8eSZYZSvWcs0yFXTNXCEMEPbBpGhWzCMcNaAkCCTeQG/pR
+4cltWOPQTun5pbVMGl2YYqLWzaKJ3JFSx/UZU5dfWIZrXkhsgY0vXA44ltK390ch
+LlQqUTKSDfPDlPVZmCwRGoit7hZccmsx47/KnjhLSdKH4URp77pNuR1LP+DBr8UE
+b18tbtkSrLvx+H/8vTpqmeZF+ZGYydGx8NVq4f3AbuKOqwwDh62cJpqOk0yC7N4l
+SRSRzoCfIheqzwIDAQABo3sweTAJBgNVHRMEAjAAMCwGCWCGSAGG+EIBDQQfFh1P
+cGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNVHQ4EFgQUso9149d6TGK4
+XARmoFYUFq+CQ1owHwYDVR0jBBgwFoAU5RXCHefuKDz6tj5Y+wthUm6wgVswDQYJ
+KoZIhvcNAQEFBQADgYEAX+Ds87S7CKYVhfJ9xFDEh+WvGjgRmLGh1keF9saAzLMr
+9ieOJBtmmEjn0N3NN+qirc/Ypxc5Wb5yoSok9dYjFLlCtC+xzRWY2RqKVTzyeL60
+umt5PSnoVEvYDxu9ae/SylwP2rS2uMx/t1E8/DrdbZw8nnGtWXKErAFuxWaLsHA=
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
new file mode 100644
index 0000000..81d00f9
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/client-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD9tru8o1QrBrOO
+aDHh8zrGPZiD2/j8WMY1R0xYwUCBcY4lLG8UoF/yhZf65dGmZSY/S1LxShEb9q8i
++yR019O9wxHcfx5JlhlK9ZyzTIVdM1cIQwQ9sGkaFbMIxw1oCQIJN5Ab+lHhyW1Y
+49BO6fmltUwaXZhiotbNoonckVLH9RlTl19YhmteSGyBjS9cDjiW0rf3RyEuVCpR
+MpIN88OU9VmYLBEaiK3uFlxyazHjv8qeOEtJ0ofhRGnvuk25HUs/4MGvxQRvXy1u
+2RKsu/H4f/y9OmqZ5kX5kZjJ0bHw1Wrh/cBu4o6rDAOHrZwmmo6TTILs3iVJFJHO
+gJ8iF6rPAgMBAAECggEAEJmkLvOAzk/h769hlCcV8WKWWApMgDZOwa2okSYT0mRb
+qJL/sZnMrVGQYBopXXnAxuNmyeLOu8WoL+G+wOZeNExPHt4yXR41CXKIjjKzhyWU
+zDWWUXL5bXt9+1UKy4PLXk8EXtBCC0Pio65EMuWcL/tsv0zga5O7+jhoTMY1ZF/D
+rsddf2mIncyEdhwAKLREmFv31lY1k+Jd+5eyXHIJEnK8lMXTcORNsb0YtlS5sRTU
+4llwQlBXjV06zIVRFxsRcPrgRYH0Hfg3hSIm3epNE+pbj0tcN0CfQFfrKJ9G2cDS
+jXimjvGsPKQ1PRMAcg93qZB3VtI+ag9bZt29cru0AQKBgQD/xzXZP5hKoqOy+8qH
+HyPvyM0QCpQ6KwHzgf5ATybPIPlyWQmT2eeR3ez4qskowNvc4Fc/q10Ao+q6jC3E
+721Wz6+iCb7Qus37KnEqVW7mWDLsDT5q7vIyRR22wWhrTpu0uZmxd9XxYRU6KUe1
+FMkI5VijJ27NoYtO+gLn9u6J6QKBgQD97xCNVaUNMNRZ1+HOKoBqcGBj91KrL74K
+/avYL0EprYwzN1lm4ZmNX8GaBeAftwnIDyxaM3Apw8BcqFFz/IslY/5sCyUmVjgI
+ZULkhCBy5ZamFNMxLvaN6njtdpgdBRxR9gzke1V/xxJgN7J39h9FI+pElwMW6314
+6AFHYQ/j9wKBgQCNwfjEOQzMgKs9bXNnxAiEwsN0GojgXCmureMd/UBDF8FocJRw
+Txqaq2bEwtLONWUlW2i/rtfSnQZg8YQEW7Y7oMt0gPYydPXoODOUBNl77HH8hbKM
+TXYKCmhXe4XFw0FkvmDCDOqT5vx+yZYmdCifN40Sj65HZTryQHoP2bmG0QKBgG/U
+ntd/hka+4GYIuvsOoKs/flPIEfIt/mXcvZdhiDMQqRPNJmQ2qmcmap6oQ8Hz3Czs
+8b1vtc/O06J6xhRsfeMjnGJ8rgmqItcfsUvuHFQ9ZBEUTsX0RsTNJCCAABGXtJcr
+4xWkc0zooOEa5lAKZk8OuBco4kVvxDxBAH8s8dCVAoGAeEZICuDGR8cOV64Eyx2X
+Ej1PQJrleMmzCwth7UhREGUgEVglhMeoBxmWCukYxpkVBY0DUy6OWH5lpTfCerFZ
+ho1AHMt9DsfUWo4hApMXEMyCZTOJwg9M4vQ1UTbFtr0mt0jnVWTUm3mVxmJnfrtz
+/DgLrvcJd7QCGAYICMNxrDs=
+-----END PRIVATE KEY-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
new file mode 100644
index 0000000..8b524c8
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-cert.pem
@@ -0,0 +1,72 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            ac:a4:b3:6b:f5:b4:5f:cb
+        Signature Algorithm: sha1WithRSAEncryption
+        Issuer: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=New CA
+        Validity
+            Not Before: Dec 20 02:45:24 2017 GMT
+            Not After : Dec 20 02:45:24 2018 GMT
+        Subject: C=US, ST=CA, O=Apache, OU=Apache Incubator, CN=Proxy
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+            RSA Public Key: (2048 bit)
+                Modulus (2048 bit):
+                    00:e1:e1:06:cc:f5:98:38:88:33:e0:f7:0a:5d:8e:
+                    a8:89:ae:8f:cd:c7:77:62:17:c2:a1:d8:fc:fc:d0:
+                    d0:86:f1:c8:3c:78:ec:b8:e9:73:1c:d1:72:55:97:
+                    c6:47:5a:4c:33:18:32:a1:9c:e1:84:2e:de:40:2f:
+                    a7:16:ed:a0:44:d6:4c:2c:04:ef:21:11:0b:6b:cb:
+                    36:8d:eb:5a:3d:a1:b6:9b:b5:23:be:bd:66:23:26:
+                    c9:82:62:44:51:f8:3a:94:07:6c:52:84:2c:d0:d9:
+                    24:8b:0a:f5:1b:c8:31:a2:29:4c:bc:b7:bf:96:e1:
+                    56:78:d2:75:08:c9:cb:0d:1a:1d:93:2d:bf:bf:86:
+                    10:06:d7:5c:b8:e6:99:05:89:6f:ad:3b:a6:37:45:
+                    15:3a:63:8b:d1:d6:0d:e4:d0:c6:06:c6:63:13:21:
+                    92:65:c1:1a:ae:1a:72:97:cf:86:ed:6f:a1:77:d8:
+                    18:67:f2:27:36:1f:ff:40:6e:57:97:90:5a:28:04:
+                    a4:a8:54:cf:a8:87:36:af:26:49:a6:4e:2d:d4:be:
+                    e6:17:e2:1a:da:c4:08:87:fd:3f:fe:7b:d8:1e:f2:
+                    66:0f:34:1a:02:5d:39:ec:66:3d:46:bc:37:ce:84:
+                    a2:51:0b:c8:72:f5:7c:5a:b8:1a:1b:0a:5d:2b:e9:
+                    56:4f
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                3F:A7:4A:6A:B1:6A:E1:51:8D:56:19:A2:2D:6A:A8:49:07:D6:87:8A
+            X509v3 Authority Key Identifier: 
+                keyid:E5:15:C2:1D:E7:EE:28:3C:FA:B6:3E:58:FB:0B:61:52:6E:B0:81:5B
+
+    Signature Algorithm: sha1WithRSAEncryption
+        98:89:57:fd:96:0e:78:06:ce:9f:83:48:28:c9:34:a4:32:93:
+        d2:65:fb:2f:a9:39:51:ff:7a:89:57:26:6a:59:0d:81:09:20:
+        75:ae:c6:aa:f6:8c:d4:d2:7f:f0:78:88:df:74:90:28:11:15:
+        77:d3:60:3d:2d:d2:ef:34:1b:03:59:9f:23:1c:21:64:e5:b8:
+        a1:99:c3:08:82:31:3d:58:01:23:52:b8:96:c8:d5:42:b3:3b:
+        50:43:cc:7d:43:08:1d:c4:46:06:7f:c3:7f:3e:6d:01:f2:25:
+        91:4b:70:fd:0f:e3:25:a6:d4:d8:c9:f6:35:65:00:87:c7:03:
+        c2:d7
+-----BEGIN CERTIFICATE-----
+MIIDKjCCApOgAwIBAgIJAKyks2v1tF/LMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIEwJDQTEPMA0GA1UEChMGQXBhY2hlMRkwFwYDVQQLExBB
+cGFjaGUgSW5jdWJhdG9yMQ8wDQYDVQQDEwZOZXcgQ0EwHhcNMTcxMjIwMDI0NTI0
+WhcNMTgxMjIwMDI0NTI0WjBWMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExDzAN
+BgNVBAoTBkFwYWNoZTEZMBcGA1UECxMQQXBhY2hlIEluY3ViYXRvcjEOMAwGA1UE
+AxMFUHJveHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDh4QbM9Zg4
+iDPg9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGE
+Lt5AL6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ
+2SSLCvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6
+Y4vR1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+o
+hzavJkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9Xxa
+uBobCl0r6VZPAgMBAAGjezB5MAkGA1UdEwQCMAAwLAYJYIZIAYb4QgENBB8WHU9w
+ZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRlMB0GA1UdDgQWBBQ/p0pqsWrhUY1W
+GaItaqhJB9aHijAfBgNVHSMEGDAWgBTlFcId5+4oPPq2Plj7C2FSbrCBWzANBgkq
+hkiG9w0BAQUFAAOBgQCYiVf9lg54Bs6fg0goyTSkMpPSZfsvqTlR/3qJVyZqWQ2B
+CSB1rsaq9ozU0n/weIjfdJAoERV302A9LdLvNBsDWZ8jHCFk5bihmcMIgjE9WAEj
+UriWyNVCsztQQ8x9QwgdxEYGf8N/Pm0B8iWRS3D9D+MlptTYyfY1ZQCHxwPC1w==
+-----END CERTIFICATE-----
diff --git a/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
new file mode 100644
index 0000000..9856807
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/authentication/tls/ProxyWithProxyAuthorizationTest/proxy-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDh4QbM9Zg4iDPg
+9wpdjqiJro/Nx3diF8Kh2Pz80NCG8cg8eOy46XMc0XJVl8ZHWkwzGDKhnOGELt5A
+L6cW7aBE1kwsBO8hEQtryzaN61o9obabtSO+vWYjJsmCYkRR+DqUB2xShCzQ2SSL
+CvUbyDGiKUy8t7+W4VZ40nUIycsNGh2TLb+/hhAG11y45pkFiW+tO6Y3RRU6Y4vR
+1g3k0MYGxmMTIZJlwRquGnKXz4btb6F32Bhn8ic2H/9AbleXkFooBKSoVM+ohzav
+JkmmTi3UvuYX4hraxAiH/T/+e9ge8mYPNBoCXTnsZj1GvDfOhKJRC8hy9XxauBob
+Cl0r6VZPAgMBAAECggEBAIXa6UHKhKNzq3K0UxMwOBYnORbUDp41wGRTB1D2maxu
+WZ/kdTv7M/ku8VdhsuGT1DYvL8nwAwBnGdPlqVoABYrlh4xKfD8XL7J4YWLmxrph
+O6q4RG+DI6TPFnlKrHv64xPX9kxMAZbeJzayjqAhGbCkUtI+/a126dx9s1c65jZj
+VyEDrfogOi3CUVHnTxZ3Yayy0gqldPAYdtt9p5YYyTxJYmuKqHBTh7FToX3RhyT0
+pZ4+IE7YV4HiBev2K8K6c4E2/UOZtkENCLy7DAQuQgokHYk0YeoG+tYfnBcIFkVD
+169Z766il027ILS8F7HMoBPQVYdf24YUgfQC3k8h8HECgYEA/VGEr3vFwxCUHtOK
+SKXCpFWpK0KvcYBQvgzLuKkTNbTWnezUwAugq+Ybao/hqsF5jEd9U8Iv35myHI8j
+EHHF9J8/zb1EcIZgTAPO4Uvc2rYxwt/c0kwy7F/FovVKg5yEscJ35iXQWFO5Yxyu
+rYU8yNVPBqXGCeUS1jJbryg1JZcCgYEA5EUmDfPHp6gWx9MmeuDqxvb2L/WHyxGb
+ojSsV5GFlCLa3QMKc1H/1+6lxLbMiGvtk2S1B9YeGWAvRB+10GSgn7AhiObxv20C
+8oqRtLPxO/eCCGOBnUiGTqKibFNyTVJ/+FgWpywQSUY8tk58fPBZvydE6XV0Wa6T
+1INerLxVnAkCgYAxkXn9PKL+AIh7X7l3bbggoAJyTKI3+3vRNH/IqozvvWshi+41
+hhDykhxbRbxKxYEbSgHkGeN0RYbsv7WEyj6KF39MqvRxcFn3hec9frLAuVYTY+q5
+2987EaKCuKzUBBSTFBKSHmQeZIOqOTqVCbVTNyo3isittv1wnHoEVEHSEQKBgQCM
+oQkjuVb8M/Ls4mmndB9Pul/LBhHFijB+isLOJAnOTHbXiAMNLqxWpGCdwxxYw10W
+3AknLcNXUMltx7dkDkpidskCJX0zuH4DXFkNoXnxvrbuYhc9Bawwj8NOx0340uWh
+4ur5zIywB8RpcAsDkbNIr3Gl/kVS5tmOJ+zQsCpxuQKBgQCKV6CDtKgGLgWvERUE
+Dei9pUx2uXtvThZomqoZqr+hZE3YmvtHZcLMK8sXJWDdkYVQ4bwDkmrSSkk5F9Nh
+PClfyOObFbOXLD0TrJZSJd/zrnmnWk8u4eE5XSwAQ+0XiO4LgQHDOutXpvW9ZVvT
+om8NGk5mEUz39XN0tuWzcN2FIQ==
+-----END PRIVATE KEY-----

-- 
To stop receiving notification emails like this one, please contact
jai1@apache.org.