You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/12 19:35:54 UTC

[incubator-pulsar] branch master updated: Add pluggable authorization mechanism (#1200)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb1c61d  Add pluggable authorization mechanism (#1200)
fb1c61d is described below

commit fb1c61d1520fa89f98a7c4582129bc39d42c7d8e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Feb 12 11:35:51 2018 -0800

    Add pluggable authorization mechanism (#1200)
    
    * Add pluggable authorization service
    
    fix: move FutureUtils to common and fix import
    
    add grantpermission api
    
    take default auth method
    
    pass authData to authorization provider
    
    keep single authorization provider
    
    * fix rebase change
---
 conf/broker.conf                                   |   3 +
 conf/discovery.conf                                |   3 +
 conf/proxy.conf                                    |   3 +
 conf/standalone.conf                               |   3 +
 conf/websocket.conf                                |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 .../authorization/AuthorizationProvider.java       | 120 ++++
 .../broker/authorization/AuthorizationService.java | 265 ++++++++
 ...nager.java => PulsarAuthorizationProvider.java} | 200 ++++--
 .../broker/cache/ConfigurationCacheService.java    |   4 +
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../org/apache/pulsar/broker/admin/Namespaces.java |  46 +-
 .../pulsar/broker/admin/NonPersistentTopics.java   |   2 +-
 .../pulsar/broker/admin/PersistentTopics.java      |  12 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |   2 +-
 .../pulsar/broker/lookup/DestinationLookup.java    |   5 +-
 .../pulsar/broker/namespace/OwnershipCache.java    |   2 +-
 .../pulsar/broker/service/BacklogQuotaManager.java |   2 +-
 .../pulsar/broker/service/BrokerService.java       |  14 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   8 +-
 .../org/apache/pulsar/broker/service/Producer.java |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  37 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../pulsar/broker/web/AuthenticationFilter.java    |   5 +
 .../pulsar/broker/web/PulsarWebResource.java       |  16 +-
 .../pulsar/broker/auth/AuthorizationTest.java      | 152 ++---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   4 +
 .../AntiAffinityNamespaceGroupTest.java            |   5 +-
 .../lookup/http/HttpDestinationLookupv2Test.java   |   8 +-
 .../pulsar/broker/service/BatchMessageTest.java    |   2 +-
 .../broker/service/PersistentFailoverE2ETest.java  |   2 +-
 .../broker/service/PersistentQueueE2ETest.java     |   2 +-
 .../pulsar/broker/service/ReplicatorTestBase.java  |   2 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  67 +-
 .../broker/service/TopicTerminationTest.java       |   2 +-
 .../api/AuthorizationProducerConsumerTest.java     | 384 ++++++++++++
 .../pulsar/client/api/BrokerServiceLookupTest.java |   2 +-
 .../client/api/SimpleProducerConsumerTest.java     |   2 +-
 .../client/impl/BrokerClientIntegrationTest.java   | 116 ++--
 .../websocket/proxy/ProxyAuthorizationTest.java    |  28 +-
 .../pulsar/client/admin/internal/BaseResource.java |   2 +-
 .../clients/consumer/PulsarKafkaConsumer.java      |   2 +-
 .../pulsar/client/api/ClientConfiguration.java     |   1 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   4 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../org/apache/pulsar/client/impl/HttpClient.java  |  18 +-
 .../pulsar/client/impl/HttpLookupService.java      |   7 +-
 .../client/impl/PartitionedConsumerImpl.java       |   2 +-
 .../client/impl/PartitionedProducerImpl.java       |   2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   4 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 680 ++++++++++-----------
 .../org/apache/pulsar/common}/util/FutureUtil.java |  10 +-
 .../discovery/service/BrokerDiscoveryProvider.java |  10 +-
 .../pulsar/discovery/service/DiscoveryService.java |  10 +-
 .../pulsar/discovery/service/ServerConnection.java |   9 +-
 .../discovery/service/server/ServiceConfig.java    |  11 +
 .../discovery/service/DiscoveryServiceTest.java    |   4 +-
 .../proxy/server/BrokerDiscoveryProvider.java      |  13 +-
 .../pulsar/proxy/server/LookupProxyHandler.java    |   3 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    |  13 +-
 .../pulsar/proxy/server/ProxyConnection.java       |   7 +-
 .../apache/pulsar/proxy/server/ProxyService.java   |  10 +-
 .../pulsar/testclient/PerformanceReader.java       |   2 +-
 .../pulsar/websocket/AbstractWebSocketHandler.java |   7 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |   6 +-
 .../apache/pulsar/websocket/ProducerHandler.java   |  11 +-
 .../org/apache/pulsar/websocket/ReaderHandler.java |   6 +-
 .../apache/pulsar/websocket/WebSocketService.java  |  12 +-
 .../websocket/admin/WebSocketWebResource.java      |  18 +-
 .../service/WebSocketProxyConfiguration.java       |  13 +
 72 files changed, 1682 insertions(+), 777 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 609cddd..d18ebac 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -203,6 +203,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
 # Allow wildcard matching in authorization
 # (wildcard matching only applicable if wildcard-char:
 # * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 7dc3f63..49f499a 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -52,6 +52,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider name list, which is comma separated list of class names
+authorizationProviders=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
 # Role names that are treated as "super-user", meaning they will be able to do all admin
 # operations and publish/consume from all topics (comma-separated)
 superUserRoles=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 78ad925..d7c5afc 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -49,6 +49,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
 # Authentication settings of the proxy itself. Used to connect to brokers
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index d2533e1..1201dce 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -169,6 +169,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
 # Allow wildcard matching in authorization
 # (wildcard matching only applicable if wildcard-char:
 # * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/conf/websocket.conf b/conf/websocket.conf
index cf5135d..404bdef 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -59,6 +59,9 @@ authenticationProviders=
 # Enforce authorization
 authorizationEnabled=false
 
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
 # Allow wildcard matching in authorization
 # (wildcard matching only applicable if wildcard-char:
 # * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index de27fd9..74077f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
@@ -188,6 +189,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
 
     // Enforce authorization
     private boolean authorizationEnabled = false;
+    // Authorization provider fully qualified class-name
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
 
     // Role names that are treated as "super-user", meaning they will be able to
     // do all admin operations and publish/consume from all topics
@@ -783,6 +786,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.authenticationEnabled = authenticationEnabled;
     }
 
+    public String getAuthorizationProvider() {
+        return authorizationProvider;
+    }
+
+    public void setAuthorizationProvider(String authorizationProvider) {
+        this.authorizationProvider = authorizationProvider;
+    }
+
     public void setAuthenticationProviders(Set<String> providersClassNames) {
         authenticationProviders = providersClassNames;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
new file mode 100644
index 0000000..9962c05
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+
+/**
+ * Provider of authorization mechanism
+ */
+public interface AuthorizationProvider extends Closeable {
+
+    /**
+     * Perform initialization for the authorization provider
+     *
+     * @param config
+     *            broker config object
+     * @param configCache
+     *            pulsar zk configuration cache service
+     * @throws IOException
+     *             if the initialization fails
+     */
+    void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
+
+    /**
+     * Check if the specified role has permission to send messages to the specified fully qualified destination name.
+     *
+     * @param destination
+     *            the fully qualified destination name associated with the destination.
+     * @param role
+     *            the app id used to send messages to the destination.
+     */
+    CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData);
+
+    /**
+     * Check if the specified role has permission to receive messages from the specified fully qualified destination
+     * name.
+     *
+     * @param destination
+     *            the fully qualified destination name associated with the destination.
+     * @param role
+     *            the app id used to receive messages from the destination.
+     * @param subscription
+     *            the subscription name defined by the client
+     */
+    CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData, String subscription);
+
+    /**
+     * Check whether the specified role can perform a lookup for the specified destination.
+     *
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param destination
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData);
+
+    /**
+     * 
+     * Grant authorization-action permission on a namespace to the given client
+     * 
+     * @param namespace
+     * @param actions
+     * @param role
+     * @param authDataJson
+     *            additional authdata in json format
+     * @return CompletableFuture
+     * @completesWith <br/>
+     *                IllegalArgumentException when namespace not found<br/>
+     *                IllegalStateException when failed to grant permission
+     */
+    CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+            String authDataJson);
+
+    /**
+     * Grant authorization-action permission on a topic to the given client
+     * 
+     * @param topicname
+     * @param role
+     * @param authDataJson
+     *            additional authdata in json format
+     * @return CompletableFuture
+     * @completesWith <br/>
+     *                IllegalArgumentException when namespace not found<br/>
+     *                IllegalStateException when failed to grant permission
+     */
+    CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions, String role,
+            String authDataJson);
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
new file mode 100644
index 0000000..8cb9232
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Authorization service that manages pluggable authorization provider and authorize requests accordingly.
+ * 
+ */
+public class AuthorizationService {
+    private static final Logger log = LoggerFactory.getLogger(AuthorizationService.class);
+
+    private AuthorizationProvider provider;
+    private final ServiceConfiguration conf;
+
+    public AuthorizationService(ServiceConfiguration conf, ConfigurationCacheService configCache)
+            throws PulsarServerException {
+
+        this.conf = conf;
+        if (this.conf.isAuthorizationEnabled()) {
+            try {
+                final String providerClassname = conf.getAuthorizationProvider();
+                if(StringUtils.isNotBlank(providerClassname)) {
+                    provider = (AuthorizationProvider) Class.forName(providerClassname).newInstance();
+                    provider.initialize(conf, configCache);
+                    log.info("{} has been loaded.", providerClassname); 
+                } else {
+                    throw new PulsarServerException("No authorization providers are present.");
+                }
+            } catch (PulsarServerException e) {
+                throw e;
+            }catch (Throwable e) {
+                throw new PulsarServerException("Failed to load an authorization provider.", e);
+            }
+        } else {
+            log.info("Authorization is disabled");
+        }
+    }
+
+    /**
+     * 
+     * Grant authorization-action permission on a namespace to the given client
+     * 
+     * @param namespace
+     * @param actions
+     * @param role
+     * @param authDataJson
+     *            additional authdata in json for targeted authorization provider
+     * @return
+     * @throws IllegalArgumentException
+     *             when namespace not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
+            String authDataJson) {
+
+        if (provider != null) {
+            return provider.grantPermissionAsync(namespace, actions, role, authDataJson);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
+    }
+
+    /**
+     * Grant authorization-action permission on a topic to the given client
+     * 
+     * @param topicname
+     * @param role
+     * @param authDataJson
+     *            additional authdata in json for targeted authorization provider
+     * @return IllegalArgumentException when namespace not found
+     * @throws IllegalStateException
+     *             when failed to grant permission
+     */
+    public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions, String role,
+            String authDataJson) {
+
+        if (provider != null) {
+            return provider.grantPermissionAsync(topicname, actions, role, authDataJson);
+        }
+        return FutureUtil
+                .failedFuture(new IllegalStateException("No authorization provider configured"));
+
+    }
+
+    /**
+     * Check if the specified role has permission to send messages to the specified fully qualified destination name.
+     *
+     * @param destination
+     *            the fully qualified destination name associated with the destination.
+     * @param role
+     *            the app id used to send messages to the destination.
+     */
+    public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) {
+
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.canProduceAsync(destination, role, authenticationData);
+        }
+        return FutureUtil
+                .failedFuture(new IllegalStateException("No authorization provider configured"));
+    }
+
+    /**
+     * Check if the specified role has permission to receive messages from the specified fully qualified destination
+     * name.
+     *
+     * @param destination
+     *            the fully qualified destination name associated with the destination.
+     * @param role
+     *            the app id used to receive messages from the destination.
+     * @param subscription
+     *            the subscription name defined by the client
+     */
+    public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData, String subscription) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+        if (provider != null) {
+            return provider.canConsumeAsync(destination, role, authenticationData, subscription);
+        }
+        return FutureUtil
+                .failedFuture(new IllegalStateException("No authorization provider configured"));
+    }
+
+    public boolean canProduce(DestinationName destination, String role, AuthenticationDataSource authenticationData) throws Exception {
+        try {
+            return canProduceAsync(destination, role, authenticationData).get(cacheTimeOutInSec,
+                    SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
+            throw e;
+        } catch (Exception e) {
+            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
+            throw e;
+        }
+    }
+
+    public boolean canConsume(DestinationName destination, String role, AuthenticationDataSource authenticationData,
+            String subscription) throws Exception {
+        try {
+            return canConsumeAsync(destination, role, authenticationData, subscription)
+                    .get(cacheTimeOutInSec, SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
+            throw e;
+        } catch (Exception e) {
+            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
+                    destination, e.getMessage());
+            throw e;
+        }
+    }
+
+    /**
+     * Check whether the specified role can perform a lookup for the specified destination.
+     *
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param destination
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    public boolean canLookup(DestinationName destination, String role, AuthenticationDataSource authenticationData) throws Exception {
+        return canProduce(destination, role, authenticationData)
+                || canConsume(destination, role, authenticationData, null);
+    }
+
+    /**
+     * Check whether the specified role can perform a lookup for the specified destination.
+     *
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param destination
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
+        canProduceAsync(destination, role, authenticationData)
+                .whenComplete((produceAuthorized, ex) -> {
+                    if (ex == null) {
+                        if (produceAuthorized) {
+                            finalResult.complete(produceAuthorized);
+                            return;
+                        }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug(
+                                    "Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
+                                    destination.toString(), role, ex.getMessage());
+                        }
+                    }
+                    canConsumeAsync(destination, role, null, null)
+                            .whenComplete((consumeAuthorized, e) -> {
+                                if (e == null) {
+                                    if (consumeAuthorized) {
+                                        finalResult.complete(consumeAuthorized);
+                                        return;
+                                    }
+                                } else {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(
+                                                "Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
+                                                destination.toString(), role, e.getMessage());
+
+                                    }
+                                    finalResult.completeExceptionally(e);
+                                    return;
+                                }
+                                finalResult.complete(false);
+                            });
+                });
+        return finalResult;
+    }
+
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
similarity index 63%
rename from pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 0950ae2..dc0ba83 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -18,36 +18,59 @@
  */
 package org.apache.pulsar.broker.authorization;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
+import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.Policies;
+import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Default authorization provider that stores authorization policies under local-zookeeper.
+ *
  */
-public class AuthorizationManager {
-    private static final Logger log = LoggerFactory.getLogger(AuthorizationManager.class);
+public class PulsarAuthorizationProvider implements AuthorizationProvider {
+    private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);
 
-    public final ServiceConfiguration conf;
-    public final ConfigurationCacheService configCache;
+    public ServiceConfiguration conf;
+    public ConfigurationCacheService configCache;
     private static final String POLICY_ROOT = "/admin/policies/";
+    private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
+
+    public PulsarAuthorizationProvider() {
+    }
+
+    public PulsarAuthorizationProvider(ServiceConfiguration conf, ConfigurationCacheService configCache)
+            throws IOException {
+        initialize(conf, configCache);
+    }
 
-    public AuthorizationManager(ServiceConfiguration conf, ConfigurationCacheService configCache) {
+    @Override
+    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+        checkNotNull(conf, "ServiceConfiguration can't be null");
+        checkNotNull(configCache, "ConfigurationCacheService can't be null");
         this.conf = conf;
         this.configCache = configCache;
+
     }
 
     /**
@@ -58,23 +81,12 @@ public class AuthorizationManager {
      * @param role
      *            the app id used to send messages to the destination.
      */
-    public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role) {
+    @Override
+    public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) {
         return checkAuthorization(destination, role, AuthAction.produce);
     }
 
-    public boolean canProduce(DestinationName destination, String role) throws Exception {
-        try {
-            return canProduceAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
-            throw e;
-        } catch (Exception e) {
-            log.warn("Producer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
-                    destination, e.getMessage());
-            throw e;
-        }
-    }
-
     /**
      * Check if the specified role has permission to receive messages from the specified fully qualified destination
      * name.
@@ -86,7 +98,9 @@ public class AuthorizationManager {
      * @param subscription
      *            the subscription name defined by the client
      */
-    public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role, String subscription) {
+    @Override
+    public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData, String subscription) {
         CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
         try {
             configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
@@ -115,30 +129,19 @@ public class AuthorizationManager {
                     permissionFuture.complete(isAuthorized);
                 });
             }).exceptionally(ex -> {
-                log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
+                log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+                        ex.getMessage());
                 permissionFuture.completeExceptionally(ex);
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+                    e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
     }
 
-    public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception {
-        try {
-            return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS);
-        } catch (InterruptedException e) {
-            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
-            throw e;
-        } catch (Exception e) {
-            log.warn("Consumer-client  with Role - {} failed to get permissions for destination - {}. {}", role,
-                    destination, e.getMessage());
-            throw e;
-        }
-    }
-
     /**
      * Check whether the specified role can perform a lookup for the specified destination.
      *
@@ -149,23 +152,11 @@ public class AuthorizationManager {
      * @return
      * @throws Exception
      */
-    public boolean canLookup(DestinationName destination, String role) throws Exception {
-        return canProduce(destination, role) || canConsume(destination, role, null);
-    }
-
-    /**
-     * Check whether the specified role can perform a lookup for the specified destination.
-     *
-     * For that the caller needs to have producer or consumer permission.
-     *
-     * @param destination
-     * @param role
-     * @return
-     * @throws Exception
-     */
-    public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
+    @Override
+    public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) {
         CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
-        canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
+        canProduceAsync(destination, role, authenticationData).whenComplete((produceAuthorized, ex) -> {
             if (ex == null) {
                 if (produceAuthorized) {
                     finalResult.complete(produceAuthorized);
@@ -178,7 +169,7 @@ public class AuthorizationManager {
                             destination.toString(), role, ex.getMessage());
                 }
             }
-            canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
+            canConsumeAsync(destination, role, authenticationData, null).whenComplete((consumeAuthorized, e) -> {
                 if (e == null) {
                     if (consumeAuthorized) {
                         finalResult.complete(consumeAuthorized);
@@ -200,6 +191,61 @@ public class AuthorizationManager {
         return finalResult;
     }
 
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(DestinationName destination, Set<AuthAction> actions,
+            String role, String authDataJson) {
+        return grantPermissionAsync(destination.getNamespaceObject(), actions, role, authDataJson);
+    }
+
+    @Override
+    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName, Set<AuthAction> actions,
+            String role, String authDataJson) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        try {
+            validatePoliciesReadOnlyAccess();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+        }
+
+        ZooKeeper globalZk = configCache.getZooKeeper();
+        final String property = namespaceName.getProperty();
+        final String cluster = namespaceName.getCluster();
+        final String namespace = namespaceName.getLocalName();
+        final String policiesPath = String.format("/%s/%s/%s/%s/%s", "admin", POLICIES, property, cluster, namespace);
+
+        try {
+            Stat nodeStat = new Stat();
+            byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+            Policies policies = getThreadLocal().readValue(content, Policies.class);
+            policies.auth_policies.namespace_auth.put(role, actions);
+
+            // Write back the new policies into zookeeper
+            globalZk.setData(policiesPath, getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+            configCache.policiesCache().invalidate(policiesPath);
+
+            log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", role, role, actions,
+                    property, cluster, namespace);
+            result.complete(null);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", role, property, cluster,
+                    namespace);
+            result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", role, property,
+                    cluster, namespace);
+            result.completeExceptionally(new IllegalStateException(
+                    "Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Failed to get permissions for namespace {}/{}/{}", role, property, cluster, namespace, e);
+            result.completeExceptionally(
+                    new IllegalStateException("Failed to get permissions for namespace " + namespace));
+        }
+
+        return result;
+    }
+
     private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
         if (isSuperUser(role)) {
             return CompletableFuture.completedFuture(true);
@@ -273,7 +319,8 @@ public class AuthorizationManager {
                 return null;
             });
         } catch (Exception e) {
-            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
+            log.warn("Client  with Role - {} failed to get permissions for destination - {}. {}", role, destination,
+                    e.getMessage());
             permissionFuture.completeExceptionally(e);
         }
         return permissionFuture;
@@ -312,4 +359,39 @@ public class AuthorizationManager {
         return role != null && superUserRoles.contains(role) ? true : false;
     }
 
+    @Override
+    public void close() throws IOException {
+        // No-op
+    }
+
+    private void validatePoliciesReadOnlyAccess() {
+        boolean arePoliciesReadOnly = true;
+        ZooKeeperCache globalZkCache = configCache.cache();
+
+        try {
+            arePoliciesReadOnly = globalZkCache.exists(POLICIES_READONLY_FLAG_PATH);
+        } catch (Exception e) {
+            log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
+            throw new IllegalStateException("Unable to fetch content from global zk");
+        }
+
+        if (arePoliciesReadOnly) {
+            if (log.isDebugEnabled()) {
+                log.debug("Policies are read-only. Broker cannot do read-write operations");
+            }
+            throw new IllegalStateException("policies are in readonly mode");
+        } else {
+            // Make sure the broker is connected to the global zookeeper before writing. If not, throw an exception.
+            if (globalZkCache.getZooKeeper().getState() != States.CONNECTED) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Broker is not connected to the global zookeeper");
+                }
+                throw new IllegalStateException("not connected woith global zookeeper");
+            } else {
+                // Do nothing, just log the message.
+                log.debug("Broker is allowed to make read-write operations");
+            }
+        }
+    }
+
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 41a8f93..4ee2865 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -164,6 +164,10 @@ public class ConfigurationCacheService {
 
     }
 
+    public ZooKeeperCache cache() {
+        return cache;
+    }
+    
     public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
         return this.propertiesCache;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6cd56ab..51e6ace 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -53,12 +53,12 @@ import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index cb8f982..e2467f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -27,7 +27,6 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 
 import java.net.URI;
 import java.net.URL;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,6 +35,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -62,7 +62,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -79,6 +78,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
@@ -451,34 +451,30 @@ public class Namespaces extends AdminResource {
     public void grantPermissionOnNamespace(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("role") String role, Set<AuthAction> actions) {
         validateAdminAccessOnProperty(property);
-        validatePoliciesReadOnlyAccess();
 
+        NamespaceName namespaceName = NamespaceName.get(property, cluster, namespace);
         try {
-            Stat nodeStat = new Stat();
-            byte[] content = globalZk().getData(path(POLICIES, property, cluster, namespace), null, nodeStat);
-            Policies policies = jsonMapper().readValue(content, Policies.class);
-            policies.auth_policies.namespace_auth.put(role, actions);
-
-            // Write back the new policies into zookeeper
-            globalZk().setData(path(POLICIES, property, cluster, namespace), jsonMapper().writeValueAsBytes(policies),
-                    nodeStat.getVersion());
-
-            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
-
-            log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", clientAppId(), role,
-                    actions, property, cluster, namespace);
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", clientAppId(), property,
-                    cluster, namespace);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", clientAppId(),
-                    property, cluster, namespace);
-            throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
+            pulsar().getBrokerService().getAuthorizationService()
+                    .grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/)
+                    .get();
+        } catch (InterruptedException e) {
             log.error("[{}] Failed to get permissions for namespace {}/{}/{}", clientAppId(), property, cluster,
                     namespace, e);
             throw new RestException(e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalArgumentException) {
+                log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", clientAppId(),
+                        property, cluster, namespace);
+                throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+            } else if (e.getCause() instanceof IllegalStateException) {
+                log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification",
+                        clientAppId(), property, cluster, namespace);
+                throw new RestException(Status.CONFLICT, "Concurrent modification");
+            } else {
+                log.error("[{}] Failed to get permissions for namespace {}/{}/{}", clientAppId(), property, cluster,
+                        namespace, e);
+                throw new RestException(e);
+            }
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
index f765934..68b4e83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/NonPersistentTopics.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -51,6 +50,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index a3b246e..484a873 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -58,15 +58,14 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
-import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
@@ -84,7 +83,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -101,8 +99,8 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -271,7 +269,7 @@ public class PersistentTopics extends AdminResource {
             validateAdminAccessOnProperty(destination.getProperty());
         } catch (Exception ve) {
             try {
-                checkAuthorization(pulsar(), destination, clientAppId());
+                checkAuthorization(pulsar(), destination, clientAppId(), clientAuthData());
             } catch (RestException re) {
                 throw re;
             } catch (Exception e) {
@@ -1366,12 +1364,12 @@ public class PersistentTopics extends AdminResource {
     }
 
     public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
-            String clientAppId, DestinationName dn) {
+            String clientAppId, AuthenticationDataSource authenticationData, DestinationName dn) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
         try {
             // (1) authorize client
             try {
-                checkAuthorization(pulsar, dn, clientAppId);
+                checkAuthorization(pulsar, dn, clientAppId, authenticationData);
             } catch (RestException e) {
                 try {
                     validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 2658a95..19d3f34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -42,11 +42,11 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
index a666166..0393d6a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/DestinationLookup.java
@@ -42,6 +42,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -198,7 +199,7 @@ public class DestinationLookup extends PulsarWebResource {
      * @return
      */
     public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pulsarService, DestinationName fqdn, boolean authoritative,
-            String clientAppId, long requestId) {
+            String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
 
         final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
         final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
@@ -217,7 +218,7 @@ public class DestinationLookup extends PulsarWebResource {
             } else {
                 // (2) authorize client
                 try {
-                    checkAuthorization(pulsarService, fqdn, clientAppId);
+                    checkAuthorization(pulsarService, fqdn, clientAppId, authenticationData);
                 } catch (RestException authException) {
                     log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString());
                     validationFuture.complete(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 48c7e71..e681fe7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -29,10 +29,10 @@ import java.util.concurrent.Executor;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 3223a77..556a1f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -27,12 +27,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 74ca915..2f3b27a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -63,7 +63,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -83,7 +83,6 @@ import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -99,6 +98,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -153,7 +153,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
     private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Topic>>> pendingTopicLoadingQueue;
 
-    private AuthorizationManager authorizationManager = null;
+    private AuthorizationService authorizationService = null;
     private final ScheduledExecutorService statsUpdater;
     private final ScheduledExecutorService backlogQuotaChecker;
 
@@ -210,7 +210,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
         if (pulsar.getConfiguration().isAuthorizationEnabled()) {
-            this.authorizationManager = new AuthorizationManager(pulsar.getConfiguration(),
+            this.authorizationService = new AuthorizationService(pulsar.getConfiguration(),
                     pulsar.getConfigurationCache());
         }
 
@@ -896,8 +896,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         return result;
     }
 
-    public AuthorizationManager getAuthorizationManager() {
-        return authorizationManager;
+    public AuthorizationService getAuthorizationService() {
+        return authorizationService;
     }
 
     public void removeTopicFromCache(String topic) {
@@ -983,7 +983,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public boolean isAuthorizationEnabled() {
-        return authorizationManager != null;
+        return authorizationService != null;
     }
 
     public int getKeepAliveIntervalSeconds() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 77c36e6..fd77ef2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -65,6 +66,7 @@ public class Consumer {
     private final SubType subType;
     private final ServerCnx cnx;
     private final String appId;
+    private AuthenticationDataSource authenticationData;
     private final String topicName;
 
     private final long consumerId;
@@ -116,6 +118,7 @@ public class Consumer {
         this.msgOut = new Rate();
         this.msgRedeliver = new Rate();
         this.appId = appId;
+        this.authenticationData = cnx.authenticationData;
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
         MESSAGE_PERMITS_UPDATER.set(this, 0);
         UNACKED_MESSAGES_UPDATER.set(this, 0);
@@ -464,9 +467,10 @@ public class Consumer {
 
     public void checkPermissions() {
         DestinationName destination = DestinationName.get(subscription.getDestination());
-        if (cnx.getBrokerService().getAuthorizationManager() != null) {
+        if (cnx.getBrokerService().getAuthorizationService() != null) {
             try {
-                if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) {
+                if (cnx.getBrokerService().getAuthorizationService().canConsume(destination, appId, authenticationData,
+                        subscription.getName())) {
                     return;
                 }
             } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e964598..c0de7a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -64,6 +65,7 @@ public class Producer {
     private Rate msgIn;
     // it records msg-drop rate only for non-persistent topic
     private final Rate msgDrop;
+    private AuthenticationDataSource authenticationData;
 
     private volatile long pendingPublishAcks = 0;
     private static final AtomicLongFieldUpdater<Producer> pendingPublishAcksUpdater = AtomicLongFieldUpdater
@@ -88,6 +90,7 @@ public class Producer {
         this.producerName = checkNotNull(producerName);
         this.closeFuture = new CompletableFuture<>();
         this.appId = appId;
+        this.authenticationData = cnx.authenticationData;
         this.msgIn = new Rate();
         this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
         this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
@@ -466,9 +469,10 @@ public class Producer {
 
     public void checkPermissions() {
         DestinationName destination = DestinationName.get(topic.getName());
-        if (cnx.getBrokerService().getAuthorizationManager() != null) {
+        if (cnx.getBrokerService().getAuthorizationService() != null) {
             try {
-                if (cnx.getBrokerService().getAuthorizationManager().canProduce(destination, appId)) {
+                if (cnx.getBrokerService().getAuthorizationService().canProduce(destination, appId,
+                        authenticationData)) {
                     return;
                 }
             } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 05fb749..b50eee6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.web.RestException;
@@ -95,6 +96,7 @@ public class ServerCnx extends PulsarHandler {
     private State state;
     private volatile boolean isActive = true;
     String authRole = null;
+    AuthenticationDataSource authenticationData;
 
     // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write spikes on the broker.
@@ -244,16 +246,16 @@ public class ServerCnx extends PulsarHandler {
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationManager()
-                        .canLookupAsync(topicName, authRole);
+                isProxyAuthorizedFuture = service.getAuthorizationService().canLookupAsync(topicName, authRole,
+                        authenticationData);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
             String finalOriginalPrincipal = originalPrincipal;
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                 if (isProxyAuthorized) {
-                    lookupDestinationAsync(getBrokerService().pulsar(), topicName,
-                            lookup.getAuthoritative(), finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole,
+                    lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(),
+                            finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
                             lookup.getRequestId()).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
@@ -331,8 +333,8 @@ public class ServerCnx extends PulsarHandler {
             }
             CompletableFuture<Boolean> isProxyAuthorizedFuture;
             if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-                isProxyAuthorizedFuture = service.getAuthorizationManager()
-                        .canLookupAsync(topicName, authRole);
+                isProxyAuthorizedFuture = service.getAuthorizationService()
+                        .canLookupAsync(topicName, authRole, authenticationData);
             } else {
                 isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
             }
@@ -340,8 +342,8 @@ public class ServerCnx extends PulsarHandler {
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
                     if (isProxyAuthorized) {
                     getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                            finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, topicName)
-                                    .handle((metadata, ex) -> {
+                            finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
+                            topicName).handle((metadata, ex) -> {
                                     if (ex == null) {
                                         int partitions = metadata.partitions;
                                         ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -497,8 +499,9 @@ public class ServerCnx extends PulsarHandler {
                         connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
                         connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
                         sslSession);
+                authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
                 authRole = getBrokerService().getAuthenticationService()
-                        .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+                        .authenticate(authenticationData, authMethod);
 
                 log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", remoteAddress, authMethod, authRole, originalPrincipal);
             } catch (AuthenticationException e) {
@@ -554,8 +557,8 @@ public class ServerCnx extends PulsarHandler {
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(topicName, authRole,
-                    subscribe.getSubscription());
+            isProxyAuthorizedFuture = service.getAuthorizationService().canConsumeAsync(topicName, authRole,
+                    authenticationData, subscribe.getSubscription());
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -563,8 +566,9 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationManager().canConsumeAsync(topicName,
-                            originalPrincipal != null ? originalPrincipal : authRole, subscriptionName);
+                    authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
+                            originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
+                            subscribe.getSubscription());
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
@@ -711,7 +715,8 @@ public class ServerCnx extends PulsarHandler {
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(topicName, authRole);
+            isProxyAuthorizedFuture = service.getAuthorizationService().canProduceAsync(topicName,
+                    authRole, authenticationData);
         } else {
             isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
         }
@@ -719,8 +724,8 @@ public class ServerCnx extends PulsarHandler {
             if (isProxyAuthorized) {
                 CompletableFuture<Boolean> authorizationFuture;
                 if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = service.getAuthorizationManager().canProduceAsync(topicName,
-                            originalPrincipal != null ? originalPrincipal : authRole);
+                    authorizationFuture = service.getAuthorizationService().canProduceAsync(topicName,
+                            originalPrincipal != null ? originalPrincipal : authRole, authenticationData);
                 } else {
                     authorizationFuture = CompletableFuture.completedFuture(true);
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5119edc..44a9e14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -69,6 +68,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57d1d75..20ae527 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -79,7 +79,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -94,6 +93,7 @@ import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.compaction.CompactedTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index d55716d..a384e35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -31,6 +31,8 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class AuthenticationFilter implements Filter {
     private final AuthenticationService authenticationService;
 
     public static final String AuthenticatedRoleAttributeName = AuthenticationFilter.class.getName() + "-role";
+    public static final String AuthenticatedDataAttributeName = AuthenticationFilter.class.getName() + "-data";
 
     public AuthenticationFilter(PulsarService pulsar) {
         this.authenticationService = pulsar.getBrokerService().getAuthenticationService();
@@ -56,6 +59,8 @@ public class AuthenticationFilter implements Filter {
         try {
             String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
             request.setAttribute(AuthenticatedRoleAttributeName, role);
+            request.setAttribute(AuthenticatedDataAttributeName,
+                    new AuthenticationDataHttps((HttpServletRequest) request));
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 40b7d03..c17182c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.web;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
@@ -41,11 +42,12 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.Namespaces;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -127,6 +129,10 @@ public abstract class PulsarWebResource {
         return (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
     }
 
+    public AuthenticationDataHttps clientAuthData() {
+        return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
+    }
+    
     public boolean isRequestHttps() {
     	return "https".equalsIgnoreCase(httpRequest.getScheme());
     }
@@ -635,17 +641,17 @@ public abstract class PulsarWebResource {
     }
 
     protected void checkConnect(DestinationName destination) throws RestException, Exception {
-        checkAuthorization(pulsar(), destination, clientAppId());
+        checkAuthorization(pulsar(), destination, clientAppId(), clientAuthData());
     }
 
-    protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role)
-            throws RestException, Exception {
+    protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) throws RestException, Exception {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
             // No enforcing of authorization policies
             return;
         }
         // get zk policy manager
-        if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
+        if (!pulsarService.getBrokerService().getAuthorizationService().canLookup(destination, role, authenticationData)) {
             log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
             throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index cee4bbe..69bfb35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -23,7 +23,7 @@ import static org.testng.Assert.fail;
 
 import java.util.EnumSet;
 
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -60,9 +60,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
 
     @Test
     void simple() throws Exception {
-        AuthorizationManager auth = pulsar.getBrokerService().getAuthorizationManager();
+        AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
 
         admin.clusters().updateCluster("c1", new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
@@ -70,69 +70,69 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("p1/c1/ns1");
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
 
         admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
                 EnumSet.of(AuthAction.consume));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null, null), false);
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
         waitForChange();
 
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null), true);
 
         // test for wildcard
 
         // namespace prefix match
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), false);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my.role.*", EnumSet.of(AuthAction.produce));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
 
         // namespace suffix match
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), false);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*.role.my", EnumSet.of(AuthAction.consume));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
 
         // revoke for next test
         admin.namespaces().revokePermissionsOnNamespace("p1/c1/ns1", "my.role.*");
@@ -140,50 +140,50 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         waitForChange();
 
         // destination prefix match
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), false);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), false);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null), false);
 
         admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*",
                 EnumSet.of(AuthAction.produce));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.2", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "other.role.2", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null), false);
 
         // destination suffix match
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), false);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), false);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my", null), false);
 
         admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds1", "*.my",
                 EnumSet.of(AuthAction.consume));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my"), false);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.my", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "1.role.my", null, null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "2.role.other", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "1.role.my", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "2.role.my", null), false);
 
         admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "my.*");
         admin.persistentTopics().revokePermissions("persistent://p1/c1/ns1/ds1", "*.my");
@@ -193,19 +193,19 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix);
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1"), true);
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2"), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null), true);
         try {
-            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "sub1"), false);
+            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1"), false);
             fail();
         } catch (Exception e) {}
         try {
-            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "sub2"), false);
+            assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2"), false);
             fail();
         } catch (Exception e) {}
 
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", "role1-sub1"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", "role2-sub2"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1"), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"), true);
 
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.properties().deleteProperty("p1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c23d726..75b91c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -176,7 +176,11 @@ public abstract class MockedPulsarServiceBaseTest {
         PulsarService pulsar = spy(new PulsarService(conf));
 
         setupBrokerMocks(pulsar);
+        boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
+        // enable authrorization to initialize authorization service which is used by grant-permission
+        conf.setAuthorizationEnabled(true);
         pulsar.start();
+        conf.setAuthorizationEnabled(isAuthorizationEnabled);
         return pulsar;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index fb44cbf..e7161b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -125,12 +125,13 @@ public class AntiAffinityNamespaceGroupTest {
         config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
         config1.setFailureDomainsEnabled(true);
         config1.setLoadBalancerEnabled(true);
+        config1.setAdvertisedAddress("localhost");
         createCluster(bkEnsemble.getZkClient(), config1);
         pulsar1 = new PulsarService(config1);
 
         pulsar1.start();
 
-        primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
+        primaryHost = String.format("%s:%d", "localhost", PRIMARY_BROKER_WEBSERVICE_PORT);
         url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
         admin1 = new PulsarAdmin(url1, (Authentication) null);
 
@@ -143,7 +144,7 @@ public class AntiAffinityNamespaceGroupTest {
         config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
         config2.setFailureDomainsEnabled(true);
         pulsar2 = new PulsarService(config2);
-        secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
+        secondaryHost = String.format("%s:%d", "localhost",
                 SECONDARY_BROKER_WEBSERVICE_PORT);
 
         pulsar2.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
index 12e2c66..645fff9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpDestinationLookupv2Test.java
@@ -41,7 +41,7 @@ import javax.ws.rs.core.UriInfo;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.lookup.DestinationLookup;
 import org.apache.pulsar.broker.lookup.NamespaceData;
@@ -70,7 +70,7 @@ public class HttpDestinationLookupv2Test {
 
     private PulsarService pulsar;
     private NamespaceService ns;
-    private AuthorizationManager auth;
+    private AuthorizationService auth;
     private ServiceConfiguration config;
     private ConfigurationCacheService mockConfigCache;
     private ZooKeeperChildrenCache clustersListCache;
@@ -83,7 +83,7 @@ public class HttpDestinationLookupv2Test {
     public void setUp() throws Exception {
         pulsar = mock(PulsarService.class);
         ns = mock(NamespaceService.class);
-        auth = mock(AuthorizationManager.class);
+        auth = mock(AuthorizationService.class);
         mockConfigCache = mock(ConfigurationCacheService.class);
         clustersListCache = mock(ZooKeeperChildrenCache.class);
         clustersCache = mock(ZooKeeperDataCache.class);
@@ -112,7 +112,7 @@ public class HttpDestinationLookupv2Test {
         doReturn(ns).when(pulsar).getNamespaceService();
         BrokerService brokerService = mock(BrokerService.class);
         doReturn(brokerService).when(pulsar).getBrokerService();
-        doReturn(auth).when(brokerService).getAuthorizationManager();
+        doReturn(auth).when(brokerService).getAuthorizationService();
         doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 6b9509c..b34e7d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 3385c7a..c172b01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -42,9 +42,9 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 64567d2..7a65cbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -45,11 +45,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 8b2c474..344ad23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -43,10 +43,10 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index bc0e561..0a5c154 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -61,14 +62,14 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
@@ -428,9 +429,10 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testProducerCommandWithAuthorizationPositive() throws Exception {
-        AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canProduceAsync(Mockito.any(),
+                Mockito.any(), Mockito.any());
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         resetChannel();
         setChannelConnected();
@@ -459,10 +461,15 @@ public class ServerCnxTest {
         doReturn(zkDataCache).when(configCacheService).policiesCache();
         doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*"));
 
-        AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
-        doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString());
+        svcConfig.setAuthorizationEnabled(true);
+        Field providerField = AuthorizationService.class.getDeclaredField("provider");
+        providerField.setAccessible(true);
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        providerField.set(authorizationService, authorizationProvider);
+        doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
 
         // Test producer creation
         resetChannel();
@@ -484,11 +491,16 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testClusterAccess() throws Exception {
-        AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        svcConfig.setAuthorizationEnabled(true);
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        Field providerField = AuthorizationService.class.getDeclaredField("provider");
+        providerField.setAccessible(true);
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        providerField.set(authorizationService, authorizationProvider);
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
-        doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString());
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(),
+        doReturn(false).when(authorizationProvider).isSuperUser(Mockito.anyString());
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(DestinationName.class), Mockito.anyString(),
                 any(AuthAction.class));
 
         resetChannel();
@@ -508,10 +520,14 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testNonExistentTopicSuperUserAccess() throws Exception {
-        AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService));
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = spy(new AuthorizationService(svcConfig, configCacheService));
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
-        doReturn(true).when(authorizationManager).isSuperUser(Mockito.anyString());
+        Field providerField = AuthorizationService.class.getDeclaredField("provider");
+        providerField.setAccessible(true);
+        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, configCacheService));
+        providerField.set(authorizationService, authorizationProvider);
+        doReturn(true).when(authorizationProvider).isSuperUser(Mockito.anyString());
 
         // Test producer creation
         resetChannel();
@@ -540,9 +556,10 @@ public class ServerCnxTest {
     }
 
     public void testProducerCommandWithAuthorizationNegative() throws Exception {
-        AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any());
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canProduceAsync(Mockito.any(),
+                Mockito.any(), Mockito.any());
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
         doReturn("prod1").when(brokerService).generateUniqueProducerName();
@@ -1127,9 +1144,10 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
-        AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).canConsumeAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
         resetChannel();
@@ -1147,9 +1165,10 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
-        AuthorizationManager authorizationManager = mock(AuthorizationManager.class);
-        doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any(), Mockito.any());
-        doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
+        AuthorizationService authorizationService = mock(AuthorizationService.class);
+        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).canConsumeAsync(Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
+        doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index aefd645..bcc5470 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
new file mode 100644
index 0000000..4f4fcd6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
+
+    private final static String clientRole = "plugbleRole";
+
+    protected void setup() throws Exception {
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        conf.setBrokerClientAuthenticationPlugin(TestAuthenticationProvider.class.getName());
+
+        Set<String> providers = new HashSet<>();
+        providers.add(TestAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+
+        super.init();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * It verifies plugable authorization service
+     * 
+     * <pre>
+     * 1. Client passes correct authorization plugin-name + correct auth role: SUCCESS
+     * 2. Client passes correct authorization plugin-name + incorrect auth-role: FAIL
+     * 3. Client passes incorrect authorization plugin-name + correct auth-role: FAIL
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testProducerAndConsumerAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName());
+        setup();
+
+        ClientConfiguration adminConf = new ClientConfiguration();
+        Authentication adminAuthentication = new ClientAuthentication("superUser");
+        adminConf.setAuthentication(adminAuthentication);
+        admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        ClientConfiguration clientConfValid = new ClientConfiguration();
+        Authentication authentication = new ClientAuthentication(clientRole);
+        clientConfValid.setAuthentication(authentication);
+
+        ClientConfiguration clientConfInvalidRole = new ClientConfiguration();
+        Authentication authenticationInvalidRole = new ClientAuthentication("test-role");
+        clientConfInvalidRole.setAuthentication(authenticationInvalidRole);
+
+        pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+        PulsarClient pulsarClientInvalidRole = PulsarClient.create(lookupUrl, clientConfInvalidRole);
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        // (1) Valid Producer and consumer creation
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name");
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic");
+        consumer.close();
+        producer.close();
+
+        // (2) InValid user auth-role will be rejected by authorization service
+        try {
+            consumer = pulsarClientInvalidRole.subscribe("persistent://my-property/use/my-ns/my-topic",
+                    "my-subscriber-name");
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+        try {
+            producer = pulsarClientInvalidRole.createProducer("persistent://my-property/use/my-ns/my-topic");
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testGrantPermission() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
+        setup();
+
+        AuthorizationService authorizationService = new AuthorizationService(conf, null);
+        DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
+        String role = "test-role";
+        Assert.assertFalse(authorizationService.canProduce(destination, role, null));
+        Assert.assertFalse(authorizationService.canConsume(destination, role, null, "sub1"));
+        authorizationService
+                .grantPermissionAsync(destination, null, role, "auth-json").get();
+        Assert.assertTrue(authorizationService.canProduce(destination, role, null));
+        Assert.assertTrue(authorizationService.canConsume(destination, role, null, "sub1"));
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testAuthData() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(TestAuthorizationProviderWithGrantPermission.class.getName());
+        setup();
+
+        AuthorizationService authorizationService = new AuthorizationService(conf, null);
+        DestinationName destination = DestinationName.get("persistent://prop/cluster/ns/t1");
+        String role = "test-role";
+        authorizationService
+                .grantPermissionAsync(destination, null, role, "auth-json")
+                .get();
+        Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authDataJson, "auth-json");
+        Assert.assertTrue(
+                authorizationService.canProduce(destination, role, new AuthenticationDataCommand("prod-auth")));
+        Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
+                "prod-auth");
+        Assert.assertTrue(authorizationService.canConsume(destination, role, new AuthenticationDataCommand("cons-auth"),
+                "sub1"));
+        Assert.assertEquals(TestAuthorizationProviderWithGrantPermission.authenticationData.getCommandData(),
+                "cons-auth");
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    public static class ClientAuthentication implements Authentication {
+        String user;
+
+        public ClientAuthentication(String user) {
+            this.user = user;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "test";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+            AuthenticationDataProvider provider = new AuthenticationDataProvider() {
+                public boolean hasDataForHttp() {
+                    return true;
+                }
+
+                @SuppressWarnings("unchecked")
+                public Set<Map.Entry<String, String>> getHttpHeaders() {
+                    return Sets.newHashSet(Maps.immutableEntry("user", user));
+                }
+
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                public String getCommandData() {
+                    return user;
+                }
+            };
+            return provider;
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            // No-op
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // No-op
+        }
+
+    }
+
+    public static class TestAuthenticationProvider implements AuthenticationProvider {
+
+        @Override
+        public void close() throws IOException {
+            // no-op
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException {
+            // No-op
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "test";
+        }
+
+        @Override
+        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+            return authData.getCommandData() != null ? authData.getCommandData() : authData.getHttpHeader("user");
+        }
+
+    }
+
+    public static class TestAuthorizationProvider implements AuthorizationProvider {
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+            // No-op
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(clientRole.equals(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData, String subscription) {
+            return CompletableFuture.completedFuture(clientRole.equals(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(clientRole.equals(role));
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+                String role, String authenticationData) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
+                String role, String authenticationData) {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    /**
+     * This provider always fails authorization on consumer and passes on producer
+     *
+     */
+    public static class TestAuthorizationProvider2 extends TestAuthorizationProvider {
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData, String subscription) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            return CompletableFuture.completedFuture(true);
+        }
+    }
+
+    public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
+
+        private Set<String> grantRoles = Sets.newHashSet();
+        static AuthenticationDataSource authenticationData;
+        static String authDataJson;
+
+        @Override
+        public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            this.authenticationData = authenticationData;
+            return CompletableFuture.completedFuture(grantRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData, String subscription) {
+            this.authenticationData = authenticationData;
+            return CompletableFuture.completedFuture(grantRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData) {
+            this.authenticationData = authenticationData;
+            return CompletableFuture.completedFuture(grantRoles.contains(role));
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
+                String role, String authData) {
+            this.authDataJson = authData;
+            grantRoles.add(role);
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions,
+                String role, String authData) {
+            this.authDataJson = authData;
+            grantRoles.add(role);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 458321c..80d2d00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -250,7 +250,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         // enable authorization: so, broker can validate cluster and redirect if finds different cluster
         pulsar.getConfiguration().setAuthorizationEnabled(true);
-        // restart broker with authorization enabled: it initialize AuthorizationManager
+        // restart broker with authorization enabled: it initialize AuthorizationService
         stopBroker();
         startBroker();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4200d92..9635c43 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -56,9 +56,9 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 342e232..5454bb7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -47,7 +47,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -63,12 +63,12 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.HandlerBase.State;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -81,6 +81,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class BrokerClientIntegrationTest extends ProducerConsumerBase {
@@ -580,6 +581,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
         int concurrentTopic = pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest();
 
         try {
+            pulsar.getConfiguration().setAuthorizationEnabled(false);
             final int concurrentLookupRequests = 20;
             ClientConfiguration clientConf = new ClientConfiguration();
             clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
@@ -632,78 +634,72 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
     @Test(timeOut = 5000)
     public void testCloseConnectionOnInternalServerError() throws Exception {
 
-        try {
-            final PulsarClient pulsarClient;
-
-            final String topicName = "persistent://prop/usw/my-ns/newTopic";
+        final PulsarClient pulsarClient;
 
-            ClientConfiguration clientConf = new ClientConfiguration();
-            clientConf.setStatsInterval(0, TimeUnit.SECONDS);
-            String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
-            pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+        final String topicName = "persistent://prop/usw/my-ns/newTopic";
 
-            ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
-            ClientCnx cnx = producer.cnx();
-            assertTrue(cnx.channel().isActive());
-            // this will throw NPE at broker while authorizing and it will throw InternalServerError
-            pulsar.getConfiguration().setAuthorizationEnabled(true);
-            try {
-                pulsarClient.createProducer(topicName);
-                fail("it should have fail with lookup-exception:");
-            } catch (Exception e) {
-                // ok
-            }
-            // connection must be closed
-            assertFalse(cnx.channel().isActive());
-            pulsarClient.close();
-        } finally {
-            pulsar.getConfiguration().setAuthorizationEnabled(false);
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+        ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
+        ClientCnx cnx = producer.cnx();
+        assertTrue(cnx.channel().isActive());
+        
+        // Need broker to throw InternalServerError. so, make global-zk unavailable
+        Field globalZkCacheField = PulsarService.class.getDeclaredField("globalZkCache");
+        globalZkCacheField.setAccessible(true);
+        globalZkCacheField.set(pulsar, null);
+        
+        try {
+            pulsarClient.createProducer(topicName);
+            fail("it should have fail with lookup-exception:");
+        } catch (Exception e) {
+            // ok
         }
+        // connection must be closed
+        assertFalse(cnx.channel().isActive());
+        pulsarClient.close();
     }
 
     @Test
     public void testInvalidDynamicConfiguration() throws Exception {
 
+        // (1) try to update invalid loadManagerClass name
         try {
-            // (1) try to update invalid loadManagerClass name
-            try {
-                admin.brokers().updateDynamicConfiguration("loadManagerClassName",
-                        "org.apache.pulsar.invalid.loadmanager");
-                fail("it should have failed due to invalid argument");
-            } catch (PulsarAdminException e) {
-                // Ok: should have failed due to invalid config value
-            }
+            admin.brokers().updateDynamicConfiguration("loadManagerClassName",
+                    "org.apache.pulsar.invalid.loadmanager");
+            fail("it should have failed due to invalid argument");
+        } catch (PulsarAdminException e) {
+            // Ok: should have failed due to invalid config value
+        }
 
-            // (2) try to update with valid loadManagerClass name
-            try {
-                admin.brokers().updateDynamicConfiguration("loadManagerClassName",
-                        "org.apache.pulsar.broker.loadbalance.ModularLoadManager");
-            } catch (PulsarAdminException e) {
-                fail("it should have failed due to invalid argument", e);
-            }
+        // (2) try to update with valid loadManagerClass name
+        try {
+            admin.brokers().updateDynamicConfiguration("loadManagerClassName",
+                    "org.apache.pulsar.broker.loadbalance.ModularLoadManager");
+        } catch (PulsarAdminException e) {
+            fail("it should have failed due to invalid argument", e);
+        }
 
-            // (3) restart broker with invalid config value
+        // (3) restart broker with invalid config value
 
-            ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar.getBrokerService()
-                    .getDynamicConfigurationCache();
-            Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
-                    .get();
-            configurationMap.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
-            byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
-            dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
-            mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
+        ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar.getBrokerService()
+                .getDynamicConfigurationCache();
+        Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+                .get();
+        configurationMap.put("loadManagerClassName", "org.apache.pulsar.invalid.loadmanager");
+        byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+        dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
+        mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
 
-            try {
-                stopBroker();
-                startBroker();
-                fail("it should have failed due to invalid argument");
-            } catch (Exception e) {
-                // Ok: should have failed due to invalid config value
-            }
-        } finally {
-            byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(Maps.newHashMap());
-            mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1);
+        try {
+            stopBroker();
             startBroker();
+            fail("it should have failed due to invalid argument");
+        } catch (Exception e) {
+            // Ok: should have failed due to invalid config value
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 302f2e4..943c62d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -78,9 +78,9 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void test() throws Exception {
-        AuthorizationManager auth = service.getAuthorizationManager();
+        AuthorizationService auth = service.getAuthorizationService();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
 
         admin.clusters().updateCluster(configClusterName, new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
@@ -88,31 +88,31 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("p1/c1/ns1");
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.of(AuthAction.produce));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
 
         admin.persistentTopics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
                 EnumSet.of(AuthAction.consume));
         waitForChange();
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role"), false);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null), false);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "other-role", null, null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds2"), "no-access-role", null, null), false);
 
-        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role"), false);
+        assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "no-access-role", null), false);
 
         admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "my-role", EnumSet.allOf(AuthAction.class));
         waitForChange();
 
-        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), true);
-        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canProduce(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), true);
+        assertEquals(auth.canConsume(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null), true);
 
         admin.namespaces().deleteNamespace("p1/c1/ns1");
         admin.properties().deleteProperty("p1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 1ef50e8..3ad4059 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -43,8 +43,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 97cde46..b5afd0e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -59,8 +59,8 @@ import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 
 public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index 9e4aece..9ab7b32 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -145,6 +145,7 @@ public class ClientConfiguration implements Serializable {
         this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
     }
 
+    
     /**
      * @return the operation timeout in ms
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 38e96ed..9ab379d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -129,8 +129,8 @@ public class ClientCnx extends PulsarHandler {
             authData = authentication.getAuthData().getCommandData();
         }
         // Send CONNECT command
-        ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData, getPulsarClientVersion(),
-                proxyToTargetBrokerAddress))
+        ctx.writeAndFlush(Commands.newConnect(authentication.getAuthMethodName(), authData,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress))
                 .addListener(future -> {
                     if (future.isSuccess()) {
                         if (log.isDebugEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6ef58b2..be5e658 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -36,9 +36,9 @@ import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
 import com.google.common.collect.Queues;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 671929c..b74bb13 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -57,6 +56,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index d4e1f54..7ad6e1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -34,6 +34,7 @@ import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.ssl.SslContext;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -55,15 +56,16 @@ public class HttpClient implements Closeable {
     protected final URL url;
     protected final Authentication authentication;
 
-    protected HttpClient(String serviceUrl, Authentication authentication, EventLoopGroup eventLoopGroup,
-            boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath) throws PulsarClientException {
-        this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection, tlsTrustCertsFilePath,
-                DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
+    protected HttpClient(String serviceUrl, Authentication authentication,
+            EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath)
+            throws PulsarClientException {
+        this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection,
+                tlsTrustCertsFilePath, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS);
     }
 
-    protected HttpClient(String serviceUrl, Authentication authentication, EventLoopGroup eventLoopGroup,
-            boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath, int connectTimeoutInSeconds,
-            int readTimeoutInSeconds) throws PulsarClientException {
+    protected HttpClient(String serviceUrl, Authentication authentication,
+            EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath,
+            int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException {
         this.authentication = authentication;
         try {
             // Ensure trailing "/" on url
@@ -129,7 +131,7 @@ public class HttpClient implements Closeable {
                     builder.setHeader(header.getKey(), header.getValue());
                 }
             }
-
+            
             final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
                     .execute(new AsyncCompletionHandler<Response>() {
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 8c12ab1..208a9b1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.concurrent.CompletableFuture;
@@ -26,10 +25,10 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,8 +42,8 @@ class HttpLookupService implements LookupService {
 
     public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
-        this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(), eventLoopGroup,
-                conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
+        this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(),
+                eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
         this.useTls = conf.isUseTls();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 1dd3e4b..60e3928 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -40,9 +40,9 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index fe9ae6c..68fda50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -35,8 +35,8 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 362cf6e..f6bd759 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -42,10 +42,10 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.ExecutorProvider;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -471,7 +471,7 @@ public class PulsarClientImpl implements PulsarClient {
             DestinationName destinationName = DestinationName.get(topic);
             metadataFuture = lookup.getPartitionedTopicMetadata(destinationName);
         } catch (IllegalArgumentException e) {
-            return FutureUtil.failedFuture(e);
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
         }
         return metadataFuture;
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2c13125..6bbe77e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -279,13 +279,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use MessageIdData.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<MessageIdData> handle;
-    private MessageIdData(io.netty.util.Recycler.Handle<MessageIdData> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private MessageIdData(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<MessageIdData> RECYCLER = new io.netty.util.Recycler<MessageIdData>() {
-            protected MessageIdData newObject(Handle<MessageIdData> handle) {
+            protected MessageIdData newObject(Handle handle) {
               return new MessageIdData(handle);
             }
           };
@@ -295,12 +295,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private MessageIdData(boolean noInit) {
-        this.handle = null;
-    }
+    private MessageIdData(boolean noInit) {}
     
     private static final MessageIdData defaultInstance;
     public static MessageIdData getDefaultInstance() {
@@ -508,20 +506,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.MessageIdDataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -780,13 +778,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements KeyValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use KeyValue.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<KeyValue> handle;
-    private KeyValue(io.netty.util.Recycler.Handle<KeyValue> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private KeyValue(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<KeyValue> RECYCLER = new io.netty.util.Recycler<KeyValue>() {
-            protected KeyValue newObject(Handle<KeyValue> handle) {
+            protected KeyValue newObject(Handle handle) {
               return new KeyValue(handle);
             }
           };
@@ -796,12 +794,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private KeyValue(boolean noInit) {
-        this.handle = null;
-    }
+    private KeyValue(boolean noInit) {}
     
     private static final KeyValue defaultInstance;
     public static KeyValue getDefaultInstance() {
@@ -1017,20 +1013,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.KeyValue, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -1249,13 +1245,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use KeyLongValue.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<KeyLongValue> handle;
-    private KeyLongValue(io.netty.util.Recycler.Handle<KeyLongValue> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private KeyLongValue(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<KeyLongValue> RECYCLER = new io.netty.util.Recycler<KeyLongValue>() {
-            protected KeyLongValue newObject(Handle<KeyLongValue> handle) {
+            protected KeyLongValue newObject(Handle handle) {
               return new KeyLongValue(handle);
             }
           };
@@ -1265,12 +1261,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private KeyLongValue(boolean noInit) {
-        this.handle = null;
-    }
+    private KeyLongValue(boolean noInit) {}
     
     private static final KeyLongValue defaultInstance;
     public static KeyLongValue getDefaultInstance() {
@@ -1464,20 +1458,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -1687,13 +1681,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements EncryptionKeysOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use EncryptionKeys.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<EncryptionKeys> handle;
-    private EncryptionKeys(io.netty.util.Recycler.Handle<EncryptionKeys> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private EncryptionKeys(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<EncryptionKeys> RECYCLER = new io.netty.util.Recycler<EncryptionKeys>() {
-            protected EncryptionKeys newObject(Handle<EncryptionKeys> handle) {
+            protected EncryptionKeys newObject(Handle handle) {
               return new EncryptionKeys(handle);
             }
           };
@@ -1703,12 +1697,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private EncryptionKeys(boolean noInit) {
-        this.handle = null;
-    }
+    private EncryptionKeys(boolean noInit) {}
     
     private static final EncryptionKeys defaultInstance;
     public static EncryptionKeys getDefaultInstance() {
@@ -1937,20 +1929,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeysOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -2328,13 +2320,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements MessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use MessageMetadata.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<MessageMetadata> handle;
-    private MessageMetadata(io.netty.util.Recycler.Handle<MessageMetadata> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private MessageMetadata(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<MessageMetadata> RECYCLER = new io.netty.util.Recycler<MessageMetadata>() {
-            protected MessageMetadata newObject(Handle<MessageMetadata> handle) {
+            protected MessageMetadata newObject(Handle handle) {
               return new MessageMetadata(handle);
             }
           };
@@ -2344,12 +2336,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private MessageMetadata(boolean noInit) {
-        this.handle = null;
-    }
+    private MessageMetadata(boolean noInit) {}
     
     private static final MessageMetadata defaultInstance;
     public static MessageMetadata getDefaultInstance() {
@@ -2872,20 +2862,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -3784,13 +3774,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements SingleMessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use SingleMessageMetadata.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<SingleMessageMetadata> handle;
-    private SingleMessageMetadata(io.netty.util.Recycler.Handle<SingleMessageMetadata> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private SingleMessageMetadata(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<SingleMessageMetadata> RECYCLER = new io.netty.util.Recycler<SingleMessageMetadata>() {
-            protected SingleMessageMetadata newObject(Handle<SingleMessageMetadata> handle) {
+            protected SingleMessageMetadata newObject(Handle handle) {
               return new SingleMessageMetadata(handle);
             }
           };
@@ -3800,12 +3790,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private SingleMessageMetadata(boolean noInit) {
-        this.handle = null;
-    }
+    private SingleMessageMetadata(boolean noInit) {}
     
     private static final SingleMessageMetadata defaultInstance;
     public static SingleMessageMetadata getDefaultInstance() {
@@ -4030,20 +4018,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -4389,13 +4377,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandConnectOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandConnect.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandConnect> handle;
-    private CommandConnect(io.netty.util.Recycler.Handle<CommandConnect> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandConnect(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandConnect> RECYCLER = new io.netty.util.Recycler<CommandConnect>() {
-            protected CommandConnect newObject(Handle<CommandConnect> handle) {
+            protected CommandConnect newObject(Handle handle) {
               return new CommandConnect(handle);
             }
           };
@@ -4405,12 +4393,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandConnect(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandConnect(boolean noInit) {}
     
     private static final CommandConnect defaultInstance;
     public static CommandConnect getDefaultInstance() {
@@ -4836,20 +4822,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConnectOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -5379,13 +5365,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandConnectedOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandConnected.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandConnected> handle;
-    private CommandConnected(io.netty.util.Recycler.Handle<CommandConnected> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandConnected(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandConnected> RECYCLER = new io.netty.util.Recycler<CommandConnected>() {
-            protected CommandConnected newObject(Handle<CommandConnected> handle) {
+            protected CommandConnected newObject(Handle handle) {
               return new CommandConnected(handle);
             }
           };
@@ -5395,12 +5381,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandConnected(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandConnected(boolean noInit) {}
     
     private static final CommandConnected defaultInstance;
     public static CommandConnected getDefaultInstance() {
@@ -5590,20 +5574,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConnectedOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -5841,13 +5825,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSubscribe.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSubscribe> handle;
-    private CommandSubscribe(io.netty.util.Recycler.Handle<CommandSubscribe> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSubscribe(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSubscribe> RECYCLER = new io.netty.util.Recycler<CommandSubscribe>() {
-            protected CommandSubscribe newObject(Handle<CommandSubscribe> handle) {
+            protected CommandSubscribe newObject(Handle handle) {
               return new CommandSubscribe(handle);
             }
           };
@@ -5857,12 +5841,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSubscribe(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSubscribe(boolean noInit) {}
     
     private static final CommandSubscribe defaultInstance;
     public static CommandSubscribe getDefaultInstance() {
@@ -6341,20 +6323,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -7050,13 +7032,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandPartitionedTopicMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandPartitionedTopicMetadata.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadata> handle;
-    private CommandPartitionedTopicMetadata(io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadata> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandPartitionedTopicMetadata(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandPartitionedTopicMetadata> RECYCLER = new io.netty.util.Recycler<CommandPartitionedTopicMetadata>() {
-            protected CommandPartitionedTopicMetadata newObject(Handle<CommandPartitionedTopicMetadata> handle) {
+            protected CommandPartitionedTopicMetadata newObject(Handle handle) {
               return new CommandPartitionedTopicMetadata(handle);
             }
           };
@@ -7066,12 +7048,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandPartitionedTopicMetadata(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandPartitionedTopicMetadata(boolean noInit) {}
     
     private static final CommandPartitionedTopicMetadata defaultInstance;
     public static CommandPartitionedTopicMetadata getDefaultInstance() {
@@ -7385,20 +7365,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -7764,13 +7744,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandPartitionedTopicMetadataResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandPartitionedTopicMetadataResponse.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadataResponse> handle;
-    private CommandPartitionedTopicMetadataResponse(io.netty.util.Recycler.Handle<CommandPartitionedTopicMetadataResponse> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandPartitionedTopicMetadataResponse(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandPartitionedTopicMetadataResponse> RECYCLER = new io.netty.util.Recycler<CommandPartitionedTopicMetadataResponse>() {
-            protected CommandPartitionedTopicMetadataResponse newObject(Handle<CommandPartitionedTopicMetadataResponse> handle) {
+            protected CommandPartitionedTopicMetadataResponse newObject(Handle handle) {
               return new CommandPartitionedTopicMetadataResponse(handle);
             }
           };
@@ -7780,12 +7760,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandPartitionedTopicMetadataResponse(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandPartitionedTopicMetadataResponse(boolean noInit) {}
     
     private static final CommandPartitionedTopicMetadataResponse defaultInstance;
     public static CommandPartitionedTopicMetadataResponse getDefaultInstance() {
@@ -8070,20 +8048,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -8418,13 +8396,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandLookupTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandLookupTopic.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandLookupTopic> handle;
-    private CommandLookupTopic(io.netty.util.Recycler.Handle<CommandLookupTopic> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandLookupTopic(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandLookupTopic> RECYCLER = new io.netty.util.Recycler<CommandLookupTopic>() {
-            protected CommandLookupTopic newObject(Handle<CommandLookupTopic> handle) {
+            protected CommandLookupTopic newObject(Handle handle) {
               return new CommandLookupTopic(handle);
             }
           };
@@ -8434,12 +8412,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandLookupTopic(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandLookupTopic(boolean noInit) {}
     
     private static final CommandLookupTopic defaultInstance;
     public static CommandLookupTopic getDefaultInstance() {
@@ -8771,20 +8747,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -9197,13 +9173,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandLookupTopicResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandLookupTopicResponse.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandLookupTopicResponse> handle;
-    private CommandLookupTopicResponse(io.netty.util.Recycler.Handle<CommandLookupTopicResponse> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandLookupTopicResponse(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandLookupTopicResponse> RECYCLER = new io.netty.util.Recycler<CommandLookupTopicResponse>() {
-            protected CommandLookupTopicResponse newObject(Handle<CommandLookupTopicResponse> handle) {
+            protected CommandLookupTopicResponse newObject(Handle handle) {
               return new CommandLookupTopicResponse(handle);
             }
           };
@@ -9213,12 +9189,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandLookupTopicResponse(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandLookupTopicResponse(boolean noInit) {}
     
     private static final CommandLookupTopicResponse defaultInstance;
     public static CommandLookupTopicResponse getDefaultInstance() {
@@ -9604,20 +9578,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -10089,13 +10063,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandProducer.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandProducer> handle;
-    private CommandProducer(io.netty.util.Recycler.Handle<CommandProducer> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandProducer(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandProducer> RECYCLER = new io.netty.util.Recycler<CommandProducer>() {
-            protected CommandProducer newObject(Handle<CommandProducer> handle) {
+            protected CommandProducer newObject(Handle handle) {
               return new CommandProducer(handle);
             }
           };
@@ -10105,12 +10079,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandProducer(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandProducer(boolean noInit) {}
     
     private static final CommandProducer defaultInstance;
     public static CommandProducer getDefaultInstance() {
@@ -10419,20 +10391,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -10882,13 +10854,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSendOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSend.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSend> handle;
-    private CommandSend(io.netty.util.Recycler.Handle<CommandSend> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSend(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSend> RECYCLER = new io.netty.util.Recycler<CommandSend>() {
-            protected CommandSend newObject(Handle<CommandSend> handle) {
+            protected CommandSend newObject(Handle handle) {
               return new CommandSend(handle);
             }
           };
@@ -10898,12 +10870,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSend(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSend(boolean noInit) {}
     
     private static final CommandSend defaultInstance;
     public static CommandSend getDefaultInstance() {
@@ -11093,20 +11063,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSend, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSend.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -11334,13 +11304,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSendReceiptOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSendReceipt.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSendReceipt> handle;
-    private CommandSendReceipt(io.netty.util.Recycler.Handle<CommandSendReceipt> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSendReceipt(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSendReceipt> RECYCLER = new io.netty.util.Recycler<CommandSendReceipt>() {
-            protected CommandSendReceipt newObject(Handle<CommandSendReceipt> handle) {
+            protected CommandSendReceipt newObject(Handle handle) {
               return new CommandSendReceipt(handle);
             }
           };
@@ -11350,12 +11320,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSendReceipt(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSendReceipt(boolean noInit) {}
     
     private static final CommandSendReceipt defaultInstance;
     public static CommandSendReceipt getDefaultInstance() {
@@ -11551,20 +11519,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceiptOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -11829,13 +11797,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSendErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSendError.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSendError> handle;
-    private CommandSendError(io.netty.util.Recycler.Handle<CommandSendError> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSendError(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSendError> RECYCLER = new io.netty.util.Recycler<CommandSendError>() {
-            protected CommandSendError newObject(Handle<CommandSendError> handle) {
+            protected CommandSendError newObject(Handle handle) {
               return new CommandSendError(handle);
             }
           };
@@ -11845,12 +11813,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSendError(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSendError(boolean noInit) {}
     
     private static final CommandSendError defaultInstance;
     public static CommandSendError getDefaultInstance() {
@@ -12088,20 +12054,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSendErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -12390,13 +12356,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandMessageOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandMessage.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandMessage> handle;
-    private CommandMessage(io.netty.util.Recycler.Handle<CommandMessage> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandMessage(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandMessage> RECYCLER = new io.netty.util.Recycler<CommandMessage>() {
-            protected CommandMessage newObject(Handle<CommandMessage> handle) {
+            protected CommandMessage newObject(Handle handle) {
               return new CommandMessage(handle);
             }
           };
@@ -12406,12 +12372,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandMessage(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandMessage(boolean noInit) {}
     
     private static final CommandMessage defaultInstance;
     public static CommandMessage getDefaultInstance() {
@@ -12587,20 +12551,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandMessageOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -12834,13 +12798,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandAckOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandAck.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandAck> handle;
-    private CommandAck(io.netty.util.Recycler.Handle<CommandAck> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandAck(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandAck> RECYCLER = new io.netty.util.Recycler<CommandAck>() {
-            protected CommandAck newObject(Handle<CommandAck> handle) {
+            protected CommandAck newObject(Handle handle) {
               return new CommandAck(handle);
             }
           };
@@ -12850,12 +12814,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandAck(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandAck(boolean noInit) {}
     
     private static final CommandAck defaultInstance;
     public static CommandAck getDefaultInstance() {
@@ -13197,20 +13159,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandAck, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandAckOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -13636,13 +13598,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandFlowOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandFlow.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandFlow> handle;
-    private CommandFlow(io.netty.util.Recycler.Handle<CommandFlow> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandFlow(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandFlow> RECYCLER = new io.netty.util.Recycler<CommandFlow>() {
-            protected CommandFlow newObject(Handle<CommandFlow> handle) {
+            protected CommandFlow newObject(Handle handle) {
               return new CommandFlow(handle);
             }
           };
@@ -13652,12 +13614,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandFlow(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandFlow(boolean noInit) {}
     
     private static final CommandFlow defaultInstance;
     public static CommandFlow getDefaultInstance() {
@@ -13829,20 +13789,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandFlowOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -14031,13 +13991,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandUnsubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandUnsubscribe.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandUnsubscribe> handle;
-    private CommandUnsubscribe(io.netty.util.Recycler.Handle<CommandUnsubscribe> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandUnsubscribe(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandUnsubscribe> RECYCLER = new io.netty.util.Recycler<CommandUnsubscribe>() {
-            protected CommandUnsubscribe newObject(Handle<CommandUnsubscribe> handle) {
+            protected CommandUnsubscribe newObject(Handle handle) {
               return new CommandUnsubscribe(handle);
             }
           };
@@ -14047,12 +14007,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandUnsubscribe(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandUnsubscribe(boolean noInit) {}
     
     private static final CommandUnsubscribe defaultInstance;
     public static CommandUnsubscribe getDefaultInstance() {
@@ -14224,20 +14182,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -14430,13 +14388,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSeekOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSeek.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSeek> handle;
-    private CommandSeek(io.netty.util.Recycler.Handle<CommandSeek> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSeek(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSeek> RECYCLER = new io.netty.util.Recycler<CommandSeek>() {
-            protected CommandSeek newObject(Handle<CommandSeek> handle) {
+            protected CommandSeek newObject(Handle handle) {
               return new CommandSeek(handle);
             }
           };
@@ -14446,12 +14404,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSeek(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSeek(boolean noInit) {}
     
     private static final CommandSeek defaultInstance;
     public static CommandSeek getDefaultInstance() {
@@ -14647,20 +14603,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSeekOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -14913,13 +14869,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandReachedEndOfTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandReachedEndOfTopic.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandReachedEndOfTopic> handle;
-    private CommandReachedEndOfTopic(io.netty.util.Recycler.Handle<CommandReachedEndOfTopic> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandReachedEndOfTopic(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandReachedEndOfTopic> RECYCLER = new io.netty.util.Recycler<CommandReachedEndOfTopic>() {
-            protected CommandReachedEndOfTopic newObject(Handle<CommandReachedEndOfTopic> handle) {
+            protected CommandReachedEndOfTopic newObject(Handle handle) {
               return new CommandReachedEndOfTopic(handle);
             }
           };
@@ -14929,12 +14885,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandReachedEndOfTopic(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandReachedEndOfTopic(boolean noInit) {}
     
     private static final CommandReachedEndOfTopic defaultInstance;
     public static CommandReachedEndOfTopic getDefaultInstance() {
@@ -15084,20 +15038,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopicOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -15247,13 +15201,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandCloseProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandCloseProducer.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandCloseProducer> handle;
-    private CommandCloseProducer(io.netty.util.Recycler.Handle<CommandCloseProducer> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandCloseProducer(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandCloseProducer> RECYCLER = new io.netty.util.Recycler<CommandCloseProducer>() {
-            protected CommandCloseProducer newObject(Handle<CommandCloseProducer> handle) {
+            protected CommandCloseProducer newObject(Handle handle) {
               return new CommandCloseProducer(handle);
             }
           };
@@ -15263,12 +15217,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandCloseProducer(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandCloseProducer(boolean noInit) {}
     
     private static final CommandCloseProducer defaultInstance;
     public static CommandCloseProducer getDefaultInstance() {
@@ -15440,20 +15392,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -15642,13 +15594,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandCloseConsumerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandCloseConsumer.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandCloseConsumer> handle;
-    private CommandCloseConsumer(io.netty.util.Recycler.Handle<CommandCloseConsumer> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandCloseConsumer(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandCloseConsumer> RECYCLER = new io.netty.util.Recycler<CommandCloseConsumer>() {
-            protected CommandCloseConsumer newObject(Handle<CommandCloseConsumer> handle) {
+            protected CommandCloseConsumer newObject(Handle handle) {
               return new CommandCloseConsumer(handle);
             }
           };
@@ -15658,12 +15610,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandCloseConsumer(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandCloseConsumer(boolean noInit) {}
     
     private static final CommandCloseConsumer defaultInstance;
     public static CommandCloseConsumer getDefaultInstance() {
@@ -15835,20 +15785,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumerOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -16039,13 +15989,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandRedeliverUnacknowledgedMessagesOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandRedeliverUnacknowledgedMessages.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandRedeliverUnacknowledgedMessages> handle;
-    private CommandRedeliverUnacknowledgedMessages(io.netty.util.Recycler.Handle<CommandRedeliverUnacknowledgedMessages> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandRedeliverUnacknowledgedMessages(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandRedeliverUnacknowledgedMessages> RECYCLER = new io.netty.util.Recycler<CommandRedeliverUnacknowledgedMessages>() {
-            protected CommandRedeliverUnacknowledgedMessages newObject(Handle<CommandRedeliverUnacknowledgedMessages> handle) {
+            protected CommandRedeliverUnacknowledgedMessages newObject(Handle handle) {
               return new CommandRedeliverUnacknowledgedMessages(handle);
             }
           };
@@ -16055,12 +16005,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandRedeliverUnacknowledgedMessages(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandRedeliverUnacknowledgedMessages(boolean noInit) {}
     
     private static final CommandRedeliverUnacknowledgedMessages defaultInstance;
     public static CommandRedeliverUnacknowledgedMessages getDefaultInstance() {
@@ -16245,20 +16193,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessagesOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -16522,13 +16470,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandSuccess.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandSuccess> handle;
-    private CommandSuccess(io.netty.util.Recycler.Handle<CommandSuccess> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandSuccess(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandSuccess> RECYCLER = new io.netty.util.Recycler<CommandSuccess>() {
-            protected CommandSuccess newObject(Handle<CommandSuccess> handle) {
+            protected CommandSuccess newObject(Handle handle) {
               return new CommandSuccess(handle);
             }
           };
@@ -16538,12 +16486,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandSuccess(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandSuccess(boolean noInit) {}
     
     private static final CommandSuccess defaultInstance;
     public static CommandSuccess getDefaultInstance() {
@@ -16693,20 +16639,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -16860,13 +16806,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandProducerSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandProducerSuccess.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandProducerSuccess> handle;
-    private CommandProducerSuccess(io.netty.util.Recycler.Handle<CommandProducerSuccess> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandProducerSuccess(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandProducerSuccess> RECYCLER = new io.netty.util.Recycler<CommandProducerSuccess>() {
-            protected CommandProducerSuccess newObject(Handle<CommandProducerSuccess> handle) {
+            protected CommandProducerSuccess newObject(Handle handle) {
               return new CommandProducerSuccess(handle);
             }
           };
@@ -16876,12 +16822,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandProducerSuccess(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandProducerSuccess(boolean noInit) {}
     
     private static final CommandProducerSuccess defaultInstance;
     public static CommandProducerSuccess getDefaultInstance() {
@@ -17093,20 +17037,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccessOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -17349,13 +17293,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandError.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandError> handle;
-    private CommandError(io.netty.util.Recycler.Handle<CommandError> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandError(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandError> RECYCLER = new io.netty.util.Recycler<CommandError>() {
-            protected CommandError newObject(Handle<CommandError> handle) {
+            protected CommandError newObject(Handle handle) {
               return new CommandError(handle);
             }
           };
@@ -17365,12 +17309,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandError(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandError(boolean noInit) {}
     
     private static final CommandError defaultInstance;
     public static CommandError getDefaultInstance() {
@@ -17586,20 +17528,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandError, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandErrorOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandError.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -17841,13 +17783,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandPingOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandPing.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandPing> handle;
-    private CommandPing(io.netty.util.Recycler.Handle<CommandPing> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandPing(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandPing> RECYCLER = new io.netty.util.Recycler<CommandPing>() {
-            protected CommandPing newObject(Handle<CommandPing> handle) {
+            protected CommandPing newObject(Handle handle) {
               return new CommandPing(handle);
             }
           };
@@ -17856,12 +17798,10 @@ public final class PulsarApi {
             this.initFields();
             this.memoizedIsInitialized = -1;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandPing(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandPing(boolean noInit) {}
     
     private static final CommandPing defaultInstance;
     public static CommandPing getDefaultInstance() {
@@ -17988,20 +17928,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandPing, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPingOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPing.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -18100,13 +18040,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandPongOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandPong.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandPong> handle;
-    private CommandPong(io.netty.util.Recycler.Handle<CommandPong> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandPong(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandPong> RECYCLER = new io.netty.util.Recycler<CommandPong>() {
-            protected CommandPong newObject(Handle<CommandPong> handle) {
+            protected CommandPong newObject(Handle handle) {
               return new CommandPong(handle);
             }
           };
@@ -18115,12 +18055,10 @@ public final class PulsarApi {
             this.initFields();
             this.memoizedIsInitialized = -1;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandPong(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandPong(boolean noInit) {}
     
     private static final CommandPong defaultInstance;
     public static CommandPong getDefaultInstance() {
@@ -18247,20 +18185,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandPong, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandPongOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandPong.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -18367,13 +18305,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandConsumerStatsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandConsumerStats.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandConsumerStats> handle;
-    private CommandConsumerStats(io.netty.util.Recycler.Handle<CommandConsumerStats> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandConsumerStats(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandConsumerStats> RECYCLER = new io.netty.util.Recycler<CommandConsumerStats>() {
-            protected CommandConsumerStats newObject(Handle<CommandConsumerStats> handle) {
+            protected CommandConsumerStats newObject(Handle handle) {
               return new CommandConsumerStats(handle);
             }
           };
@@ -18383,12 +18321,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandConsumerStats(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandConsumerStats(boolean noInit) {}
     
     private static final CommandConsumerStats defaultInstance;
     public static CommandConsumerStats getDefaultInstance() {
@@ -18560,20 +18496,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -18814,13 +18750,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements CommandConsumerStatsResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use CommandConsumerStatsResponse.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<CommandConsumerStatsResponse> handle;
-    private CommandConsumerStatsResponse(io.netty.util.Recycler.Handle<CommandConsumerStatsResponse> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private CommandConsumerStatsResponse(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<CommandConsumerStatsResponse> RECYCLER = new io.netty.util.Recycler<CommandConsumerStatsResponse>() {
-            protected CommandConsumerStatsResponse newObject(Handle<CommandConsumerStatsResponse> handle) {
+            protected CommandConsumerStatsResponse newObject(Handle handle) {
               return new CommandConsumerStatsResponse(handle);
             }
           };
@@ -18830,12 +18766,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private CommandConsumerStatsResponse(boolean noInit) {
-        this.handle = null;
-    }
+    private CommandConsumerStatsResponse(boolean noInit) {}
     
     private static final CommandConsumerStatsResponse defaultInstance;
     public static CommandConsumerStatsResponse getDefaultInstance() {
@@ -19347,20 +19281,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
@@ -20186,13 +20120,13 @@ public final class PulsarApi {
       com.google.protobuf.GeneratedMessageLite
       implements BaseCommandOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
     // Use BaseCommand.newBuilder() to construct.
-    private final io.netty.util.Recycler.Handle<BaseCommand> handle;
-    private BaseCommand(io.netty.util.Recycler.Handle<BaseCommand> handle) {
+    private io.netty.util.Recycler.Handle handle;
+    private BaseCommand(io.netty.util.Recycler.Handle handle) {
       this.handle = handle;
     }
     
      private static final io.netty.util.Recycler<BaseCommand> RECYCLER = new io.netty.util.Recycler<BaseCommand>() {
-            protected BaseCommand newObject(Handle<BaseCommand> handle) {
+            protected BaseCommand newObject(Handle handle) {
               return new BaseCommand(handle);
             }
           };
@@ -20202,12 +20136,10 @@ public final class PulsarApi {
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
             this.memoizedSerializedSize = -1;
-            handle.recycle(this);
+            if (handle != null) { RECYCLER.recycle(this, handle); }
         }
          
-    private BaseCommand(boolean noInit) {
-        this.handle = null;
-    }
+    private BaseCommand(boolean noInit) {}
     
     private static final BaseCommand defaultInstance;
     public static BaseCommand getDefaultInstance() {
@@ -21109,20 +21041,20 @@ public final class PulsarApi {
           org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand, Builder>
         implements org.apache.pulsar.common.api.proto.PulsarApi.BaseCommandOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
       // Construct using org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.newBuilder()
-      private final io.netty.util.Recycler.Handle<Builder> handle;
-      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
         this.handle = handle;
         maybeForceBuilderInitialization();
       }
       private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
-         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
                return new Builder(handle);
              }
             };
       
        public void recycle() {
                 clear();
-                handle.recycle(this);
+                if (handle != null) {RECYCLER.recycle(this, handle);}
             }
       
       private void maybeForceBuilderInitialization() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
similarity index 89%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 2f0490d..e07941a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.common.util;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.client.api.PulsarClientException;
-
 public class FutureUtil {
 
     /**
@@ -63,11 +61,7 @@ public class FutureUtil {
 
     public static <T> CompletableFuture<T> failedFuture(Throwable t) {
         CompletableFuture<T> future = new CompletableFuture<>();
-        if (t instanceof PulsarClientException) {
-            future.completeExceptionally(t);
-        } else {
-            future.completeExceptionally(new PulsarClientException(t));
-        }
+        future.completeExceptionally(t);
         return future;
     }
 }
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 63123f3..1951592 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -97,11 +98,11 @@ public class BrokerDiscoveryProvider implements Closeable {
     }
 
     CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DiscoveryService service,
-            DestinationName destination, String role) {
+            DestinationName destination, String role, AuthenticationDataSource authenticationData) {
 
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
         try {
-            checkAuthorization(service, destination, role);
+            checkAuthorization(service, destination, role, authenticationData);
             final String path = path(PARTITIONED_TOPIC_PATH_ZNODE, destination.getProperty(), destination.getCluster(),
                     destination.getNamespacePortion(), "persistent", destination.getEncodedLocalName());
             // gets the number of partitions from the zk cache
@@ -126,7 +127,8 @@ public class BrokerDiscoveryProvider implements Closeable {
         return metadataFuture;
     }
 
-    protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role)
+    protected static void checkAuthorization(DiscoveryService service, DestinationName destination, String role,
+            AuthenticationDataSource authenticationData)
             throws Exception {
         if (!service.getConfiguration().isAuthorizationEnabled()
                 || service.getConfiguration().getSuperUserRoles().contains(role)) {
@@ -134,7 +136,7 @@ public class BrokerDiscoveryProvider implements Closeable {
             return;
         }
         // get zk policy manager
-        if (!service.getAuthorizationManager().canLookup(destination, role)) {
+        if (!service.getAuthorizationService().canLookup(destination, role, authenticationData)) {
             LOG.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
             // check namespace authorization
             PropertyAdmin propertyAdmin;
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 387ba7a..38ae7eb 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -26,7 +26,7 @@ import java.net.InetAddress;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -55,7 +55,7 @@ public class DiscoveryService implements Closeable {
     private final String serviceUrlTls;
     private ConfigurationCacheService configurationCacheService;
     private AuthenticationService authenticationService;
-    private AuthorizationManager authorizationManager;
+    private AuthorizationService authorizationService;
     private ZooKeeperClientFactory zkClientFactory = null;
     private BrokerDiscoveryProvider discoveryProvider;
     private final EventLoopGroup acceptorGroup;
@@ -82,7 +82,7 @@ public class DiscoveryService implements Closeable {
         this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
         ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
         authenticationService = new AuthenticationService(serviceConfiguration);
-        authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+        authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
         startServer();
     }
 
@@ -181,8 +181,8 @@ public class DiscoveryService implements Closeable {
         return authenticationService;
     }
 
-    public AuthorizationManager getAuthorizationManager() {
-        return authorizationManager;
+    public AuthorizationService getAuthorizationService() {
+        return authorizationService;
     }
 
     public ConfigurationCacheService getConfigurationCacheService() {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
index 4f694de..80e9f7e 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -28,6 +28,7 @@ import javax.net.ssl.SSLSession;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -50,6 +51,7 @@ public class ServerConnection extends PulsarHandler {
 
     private DiscoveryService service;
     private String authRole = null;
+    private AuthenticationDataSource authenticationData = null;
     private State state;
     public static final String TLS_HANDLER = "tls";
 
@@ -87,8 +89,9 @@ public class ServerConnection extends PulsarHandler {
                 if (sslHandler != null) {
                     sslSession = ((SslHandler) sslHandler).engine().getSession();
                 }
+                this.authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
                 authRole = service.getAuthenticationService()
-                        .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+                        .authenticate(this.authenticationData, authMethod);
                 LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
             } catch (AuthenticationException e) {
                 String msg = "Unable to authenticate";
@@ -144,7 +147,9 @@ public class ServerConnection extends PulsarHandler {
         final long requestId = partitionMetadata.getRequestId();
         DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
 
-        service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, authRole).thenAccept(metadata -> {
+        service.getDiscoveryProvider()
+                .getPartitionedTopicMetadata(service, dn, authRole, authenticationData)
+                .thenAccept(metadata -> {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[{}] Total number of partitions for topic {} is {}", authRole, dn, metadata.partitions);
             }
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 8e9cc61..eb986f1 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.discovery.service.server;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
 
@@ -67,6 +68,8 @@ public class ServiceConfig implements PulsarConfiguration {
     private Set<String> authenticationProviders = Sets.newTreeSet();
     // Enforce authorization
     private boolean authorizationEnabled = false;
+    // Authorization provider fully qualified class-name
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
 
     /***** --- TLS --- ****/
     // Enable TLS
@@ -210,6 +213,14 @@ public class ServiceConfig implements PulsarConfiguration {
         this.authorizationEnabled = authorizationEnabled;
     }
 
+    public String getAuthorizationProvider() {
+        return authorizationProvider;
+    }
+
+    public void setAuthorizationProvider(String authorizationProvider) {
+        this.authorizationProvider = authorizationProvider;
+    }
+
     public Set<String> getSuperUserRoles() {
         return superUserRoles;
     }
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index bf83116..3351145 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -101,7 +101,7 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
     public void testGetPartitionsMetadata() throws Exception {
         DestinationName topic1 = DestinationName.get("persistent://test/local/ns/my-topic-1");
 
-        PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role")
+        PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
                 .get();
         assertEquals(m.partitions, 0);
 
@@ -109,7 +109,7 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
         mockZookKeeper.failNow(Code.SESSIONEXPIRED);
         DestinationName topic2 = DestinationName.get("persistent://test/local/ns/my-topic-2");
         CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
-                .getPartitionedTopicMetadata(service, topic2, "role");
+                .getPartitionedTopicMetadata(service, topic2, "role", null);
         try {
             future.get();
             fail("Partition metadata lookup should have failed");
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index c9444ff..8faa3da 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
 
 import java.io.Closeable;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Joiner;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 /**
  * Maintains available active broker list and returns next active broker in round-robin for discovery service.
@@ -96,11 +97,11 @@ public class BrokerDiscoveryProvider implements Closeable {
     }
 
     CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(ProxyService service,
-            DestinationName destination, String role) {
+            DestinationName destination, String role, AuthenticationDataSource authenticationData) {
 
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
         try {
-            checkAuthorization(service, destination, role);
+            checkAuthorization(service, destination, role, authenticationData);
             final String path = path(PARTITIONED_TOPIC_PATH_ZNODE, destination.getProperty(), destination.getCluster(),
                     destination.getNamespacePortion(), "persistent", destination.getEncodedLocalName());
             // gets the number of partitions from the zk cache
@@ -125,15 +126,15 @@ public class BrokerDiscoveryProvider implements Closeable {
         return metadataFuture;
     }
 
-    protected static void checkAuthorization(ProxyService service, DestinationName destination, String role)
-            throws Exception {
+    protected static void checkAuthorization(ProxyService service, DestinationName destination, String role,
+            AuthenticationDataSource authenticationData) throws Exception {
         if (!service.getConfiguration().isAuthorizationEnabled()
                 || service.getConfiguration().getSuperUserRoles().contains(role)) {
             // No enforcing of authorization policies
             return;
         }
         // get zk policy manager
-        if (!service.getAuthorizationManager().canLookup(destination, role)) {
+        if (!service.getAuthorizationService().canLookup(destination, role, authenticationData)) {
             LOG.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
             // check namespace authorization
             PropertyAdmin propertyAdmin;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index f9ba3c1..7d4d683 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -161,7 +161,8 @@ public class LookupProxyHandler {
         final long clientRequestId = partitionMetadata.getRequestId();
         DestinationName dn = DestinationName.get(partitionMetadata.getTopic());
         if (isBlank(brokerServiceURL)) {
-            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole)
+            service.getDiscoveryProvider().getPartitionedTopicMetadata(service, dn, proxyConnection.clientAuthRole,
+                    proxyConnection.authenticationData)
                     .thenAccept(metadata -> {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Total number of partitions for topic {} is {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e5906ba..5dba6f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import com.google.common.collect.Sets;
@@ -59,10 +60,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
     private Set<String> authenticationProviders = Sets.newTreeSet();
     // Enforce authorization
     private boolean authorizationEnabled = false;
+    // Authorization provider fully qualified class-name
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
     // Forward client authData to Broker for re authorization
     // make sure authentication is enabled for this to take effect
     private boolean forwardAuthorizationCredentials = false;
-            
+
     // Authentication settings of the proxy itself. Used to connect to brokers
     private String brokerClientAuthenticationPlugin;
     private String brokerClientAuthenticationParameters;
@@ -263,6 +266,14 @@ public class ProxyConfiguration implements PulsarConfiguration {
         this.authorizationEnabled = authorizationEnabled;
     }
 
+    public String getAuthorizationProvider() {
+        return authorizationProvider;
+    }
+
+    public void setAuthorizationProvider(String authorizationProvider) {
+        this.authorizationProvider = authorizationProvider;
+    }
+
     public Set<String> getSuperUserRoles() {
         return superUserRoles;
     }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 68bd022..89d737d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,6 +27,7 @@ import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -55,6 +56,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     String clientAuthRole = null;
     String clientAuthData = null;
     String clientAuthMethod = null;
+    AuthenticationDataSource authenticationData;
     private State state;
 
     private LookupProxyHandler lookupProxyHandler = null;
@@ -162,7 +164,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             close();
             return;
         }
-
+        
         if (connect.hasProxyToBrokerUrl()) {
             // Client already knows which broker to connect. Let's open a connection
             // there and just pass bytes in both directions
@@ -223,8 +225,9 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             if (sslHandler != null) {
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
             }
+            authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
             clientAuthRole = service.getAuthenticationService()
-                    .authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);
+                    .authenticate(authenticationData, authMethod);
             LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod,
                     clientAuthRole);
             return true;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index e5c2e91..0ddee42 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,7 +28,7 @@ import java.net.UnknownHostException;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
@@ -60,7 +60,7 @@ public class ProxyService implements Closeable {
     private final String serviceUrlTls;
     private ConfigurationCacheService configurationCacheService;
     private AuthenticationService authenticationService;
-    private AuthorizationManager authorizationManager;
+    private AuthorizationService authorizationService;
     private ZooKeeperClientFactory zkClientFactory = null;
 
     private final EventLoopGroup acceptorGroup;
@@ -126,7 +126,7 @@ public class ProxyService implements Closeable {
             });
             discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
             this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
-            authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
+            authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
         }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
@@ -200,8 +200,8 @@ public class ProxyService implements Closeable {
         return authenticationService;
     }
 
-    public AuthorizationManager getAuthorizationManager() {
-        return authorizationManager;
+    public AuthorizationService getAuthorizationService() {
+        return authorizationService;
     }
 
     public Authentication getClientAuthentication() {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 9d15892..c308175 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -37,8 +37,8 @@ import org.apache.pulsar.client.api.ReaderConfiguration;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 5a77a00..91e62b7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -30,6 +30,8 @@ import javax.naming.AuthenticationException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -63,6 +65,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
 
     private boolean checkAuth(ServletUpgradeResponse response) {
         String authRole = "<none>";
+        AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request);
         if (service.isAuthenticationEnabled()) {
             try {
                 authRole = service.getAuthenticationService().authenticateHttpRequest(request);
@@ -84,7 +87,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
 
         if (service.isAuthorizationEnabled()) {
             try {
-                if (!isAuthorized(authRole)) {
+                if (!isAuthorized(authRole, authenticationData)) {
                     log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(),
                             request.getRemotePort(), authRole, topic);
                     response.sendError(HttpServletResponse.SC_FORBIDDEN, "Not authorized");
@@ -167,7 +170,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
         return dn.toString();
     }
 
-    protected abstract Boolean isAuthorized(String authRole) throws Exception;
+    protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception;
 
     private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
 }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 27f62b1..b392e0c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.LongAdder;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
@@ -282,8 +283,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
     }
 
     @Override
-    protected Boolean isAuthorized(String authRole) throws Exception {
-        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
+    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+        return service.getAuthorizationService().canConsume(DestinationName.get(topic), authRole, authenticationData,
+                this.subscription);
     }
 
     private static String extractSubscription(HttpServletRequest request) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index e551885..cb2e288 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.websocket;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
-import static org.apache.pulsar.websocket.WebSocketError.FailedToCreateProducer;
 import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
 import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
 import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
 
 import java.io.IOException;
 import java.util.Base64;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
@@ -34,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -47,15 +47,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.apache.pulsar.websocket.stats.StatsBuckets;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.UpgradeResponse;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Enums;
-import static com.google.common.base.Preconditions.checkArgument;
 
 
 /**
@@ -224,8 +221,8 @@ public class ProducerHandler extends AbstractWebSocketHandler {
     }
 
     @Override
-    protected Boolean isAuthorized(String authRole) throws Exception {
-        return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
+    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+        return service.getAuthorizationService().canProduce(DestinationName.get(topic), authRole, authenticationData);
     }
 
     private void sendAckResponse(ProducerAck response) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index eae7717..4d6c271 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.LongAdder;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
@@ -246,8 +247,9 @@ public class ReaderHandler extends AbstractWebSocketHandler {
     }
 
     @Override
-    protected Boolean isAuthorized(String authRole) throws Exception {
-        return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole, this.subscription);
+    protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
+        return service.getAuthorizationService().canConsume(DestinationName.get(topic), authRole, authenticationData,
+                this.subscription);
     }
 
     private MessageId getMessageId() throws IOException {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 02a589f..d5a2c84 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -34,7 +34,7 @@ import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authorization.AuthorizationManager;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -64,7 +64,7 @@ public class WebSocketService implements Closeable {
     public static final int MaxTextFrameSize = 1024 * 1024;
 
     AuthenticationService authenticationService;
-    AuthorizationManager authorizationManager;
+    AuthorizationService authorizationService;
     PulsarClient pulsarClient;
 
     private final ScheduledExecutorService executor = Executors
@@ -112,13 +112,13 @@ public class WebSocketService implements Closeable {
             log.info("Global Zookeeper cache started");
         }
 
-        // start authorizationManager
+        // start authorizationService
         if (config.isAuthorizationEnabled()) {
             if (configurationCacheService == null) {
                 throw new PulsarServerException(
                         "Failed to initialize authorization manager due to empty GlobalZookeeperServers");
             }
-            authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
+            authorizationService = new AuthorizationService(this.config, configurationCacheService);
         }
         // start authentication service
         authenticationService = new AuthenticationService(this.config);
@@ -147,8 +147,8 @@ public class WebSocketService implements Closeable {
         return authenticationService;
     }
 
-    public AuthorizationManager getAuthorizationManager() {
-        return authorizationManager;
+    public AuthorizationService getAuthorizationService() {
+        return authorizationService;
     }
 
     public ZooKeeperCache getGlobalZkCache() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
index 122c12b..a634e18 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketWebResource.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.websocket.admin;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
 import javax.naming.AuthenticationException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
@@ -28,6 +27,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.slf4j.Logger;
@@ -50,7 +50,8 @@ public class WebSocketWebResource {
     private WebSocketService socketService;
     
     private String clientId;
-
+    private AuthenticationDataHttps authData;
+    
     protected WebSocketService service() {
         if (socketService == null) {
             socketService = (WebSocketService) servletContext.getAttribute(ATTRIBUTE_PROXY_SERVICE_NAME);
@@ -79,8 +80,14 @@ public class WebSocketWebResource {
         }
         return clientId;
     }
-    
-    
+
+    public AuthenticationDataHttps authData() {
+        if (authData == null) {
+            authData = new AuthenticationDataHttps(httpRequest);
+        }
+        return authData;
+    }
+
     /**
      * Checks whether the user has Pulsar Super-User access to the system.
      *
@@ -128,8 +135,7 @@ public class WebSocketWebResource {
      */
     protected boolean isAuthorized(DestinationName topic) throws Exception {
         if (service().isAuthorizationEnabled()) {
-            String authRole = clientAppId();
-            return service().getAuthorizationManager().canLookup(topic, authRole);
+            return service().getAuthorizationService().canLookup(topic, clientAppId(), authData());
         }
         return true;
     }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 6f9536c..5cea3df 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.service;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
@@ -63,6 +64,10 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
     private Set<String> authenticationProviders = Sets.newTreeSet();
     // Enforce authorization
     private boolean authorizationEnabled;
+    // Authorization provider fully qualified class-name
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
+
+
     // Role names that are treated as "super-user", meaning they will be able to
     // do all admin operations and publish/consume from all topics
     private Set<String> superUserRoles = Sets.newTreeSet();
@@ -202,6 +207,14 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
         this.authorizationEnabled = authorizationEnabled;
     }
 
+    public String getAuthorizationProvider() {
+        return authorizationProvider;
+    }
+
+    public void setAuthorizationProvider(String authorizationProvider) {
+        this.authorizationProvider = authorizationProvider;
+    }
+    
     public boolean getAuthorizationAllowWildcardsMatching() {
         return authorizationAllowWildcardsMatching;
     }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.