You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/23 05:14:11 UTC

[pulsar] branch master updated: [PulsarAdmin] Namespaces to async (#6590)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e210f  [PulsarAdmin] Namespaces to async  (#6590)
a7e210f is described below

commit a7e210f15bf0b27546c8b22c04c1b05cda8f3ee8
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Mar 23 13:14:00 2020 +0800

    [PulsarAdmin] Namespaces to async  (#6590)
---
 .../org/apache/pulsar/client/admin/Namespaces.java | 1200 ++++++++++-
 .../client/admin/internal/NamespacesImpl.java      | 2168 +++++++++++++++-----
 2 files changed, 2853 insertions(+), 515 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bfd4b7b..15b2ace 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -74,6 +74,24 @@ public interface Namespaces {
     List<String> getNamespaces(String tenant) throws PulsarAdminException;
 
     /**
+     * Get the list of namespaces asynchronously.
+     * <p>
+     * Get the list of all the namespaces for a certain tenant.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["my-tenant/c1/namespace1",
+     *  "my-tenant/global/namespace2",
+     *  "my-tenant/c2/namespace3"]</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     */
+    CompletableFuture<List<String>> getNamespacesAsync(String tenant);
+
+    /**
      * Get the list of namespaces.
      * <p>
      * Get the list of all the namespaces for a certain tenant on single cluster.
@@ -124,6 +142,23 @@ public interface Namespaces {
     List<String> getTopics(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the list of topics asynchronously.
+     * <p>
+     * Get the list of all the topics under a certain namespace.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+     *  "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<List<String>> getTopicsAsync(String namespace);
+
+    /**
      * Get policies for a namespace.
      * <p>
      * Get the dump all the policies specified for a namespace.
@@ -163,6 +198,38 @@ public interface Namespaces {
     Policies getPolicies(String namespace) throws PulsarAdminException;
 
     /**
+     * Get policies for a namespace asynchronously.
+     * <p>
+     * Get the dump all the policies specified for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>{
+     *   "auth_policies" : {
+     *     "namespace_auth" : {
+     *       "my-role" : [ "produce" ]
+     *     },
+     *     "destination_auth" : {
+     *       "persistent://prop/local/ns1/my-topic" : {
+     *         "role-1" : [ "produce" ],
+     *         "role-2" : [ "consume" ]
+     *       }
+     *     }
+     *   },
+     *   "replication_clusters" : ["use", "usw"],
+     *   "message_ttl_in_seconds" : 300
+     * }</code>
+     * </pre>
+     *
+     * @see Policies
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Policies> getPoliciesAsync(String namespace);
+
+    /**
      * Create a new namespace.
      * <p>
      * Creates a new empty namespace with no policies attached.
@@ -190,6 +257,18 @@ public interface Namespaces {
      *
      * @param namespace
      *            Namespace name
+     * @param numBundles
+     *            Number of bundles
+     */
+    CompletableFuture<Void> createNamespaceAsync(String namespace, int numBundles);
+
+    /**
+     * Create a new namespace.
+     * <p>
+     * Creates a new empty namespace with no policies attached.
+     *
+     * @param namespace
+     *            Namespace name
      * @param bundlesData
      *            Bundles Data
      *
@@ -205,6 +284,18 @@ public interface Namespaces {
     void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException;
 
     /**
+     * Create a new namespace asynchronously.
+     * <p>
+     * Creates a new empty namespace with no policies attached.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param bundlesData
+     *            Bundles Data
+     */
+    CompletableFuture<Void> createNamespaceAsync(String namespace, BundlesData bundlesData);
+
+    /**
      * Create a new namespace.
      * <p>
      * Creates a new empty namespace with no policies attached.
@@ -224,6 +315,16 @@ public interface Namespaces {
     void createNamespace(String namespace) throws PulsarAdminException;
 
     /**
+     * Create a new namespace asynchronously.
+     * <p>
+     * Creates a new empty namespace with no policies attached.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> createNamespaceAsync(String namespace);
+
+    /**
      * Create a new namespace.
      * <p>
      * Creates a new empty namespace with no policies attached.
@@ -246,6 +347,19 @@ public interface Namespaces {
     void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException;
 
     /**
+     * Create a new namespace asynchronously.
+     * <p>
+     * Creates a new empty namespace with no policies attached.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param clusters
+     *            Clusters in which the namespace will be present. If more than one cluster is present, replication
+     *            across clusters will be enabled.
+     */
+    CompletableFuture<Void> createNamespaceAsync(String namespace, Set<String> clusters);
+
+    /**
      * Create a new namespace.
      * <p>
      * Creates a new namespace with the specified policies.
@@ -269,6 +383,18 @@ public interface Namespaces {
     void createNamespace(String namespace, Policies policies) throws PulsarAdminException;
 
     /**
+     * Create a new namespace asynchronously.
+     * <p>
+     * Creates a new namespace with the specified policies.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param policies
+     *            Policies for the namespace
+     */
+    CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies);
+
+    /**
      * Delete an existing namespace.
      * <p>
      * The namespace needs to be empty.
@@ -288,6 +414,16 @@ public interface Namespaces {
     void deleteNamespace(String namespace) throws PulsarAdminException;
 
     /**
+     * Delete an existing namespace asynchronously.
+     * <p>
+     * The namespace needs to be empty.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> deleteNamespaceAsync(String namespace);
+
+    /**
      * Delete an existing bundle in a namespace.
      * <p>
      * The bundle needs to be empty.
@@ -349,6 +485,25 @@ public interface Namespaces {
     Map<String, Set<AuthAction>> getPermissions(String namespace) throws PulsarAdminException;
 
     /**
+     * Get permissions on a namespace asynchronously.
+     * <p>
+     * Retrieve the permissions for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>{
+     *   "my-role" : [ "produce" ]
+     *   "other-role" : [ "consume" ]
+     * }</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String namespace);
+
+    /**
      * Grant permission on a namespace.
      * <p>
      * Grant a new permission to a client role on a namespace.
@@ -378,6 +533,26 @@ public interface Namespaces {
     void grantPermissionOnNamespace(String namespace, String role, Set<AuthAction> actions) throws PulsarAdminException;
 
     /**
+     * Grant permission on a namespace asynchronously.
+     * <p>
+     * Grant a new permission to a client role on a namespace.
+     * <p>
+     * Request parameter example:
+     *
+     * <pre>
+     * <code>["produce", "consume"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param role
+     *            Client role to which grant permission
+     * @param actions
+     *            Auth actions (produce and consume)
+     */
+    CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace, String role, Set<AuthAction> actions);
+
+    /**
      * Revoke permissions on a namespace.
      * <p>
      * Revoke all permissions to a client role on a namespace.
@@ -397,6 +572,18 @@ public interface Namespaces {
     void revokePermissionsOnNamespace(String namespace, String role) throws PulsarAdminException;
 
     /**
+     * Revoke permissions on a namespace asynchronously.
+     * <p>
+     * Revoke all permissions to a client role on a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param role
+     *            Client role to which remove permissions
+     */
+    CompletableFuture<Void> revokePermissionsOnNamespaceAsync(String namespace, String role);
+
+    /**
      * Grant permission to role to access subscription's admin-api.
      * @param namespace
      * @param subscription
@@ -406,6 +593,14 @@ public interface Namespaces {
     void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles) throws PulsarAdminException;
 
     /**
+     * Grant permission to role to access subscription's admin-api asynchronously.
+     * @param namespace
+     * @param subscription
+     * @param roles
+     */
+    CompletableFuture<Void> grantPermissionOnSubscriptionAsync(String namespace, String subscription, Set<String> roles);
+
+    /**
      * Revoke permissions on a subscription's admin-api access.
      * @param namespace
      * @param subscription
@@ -415,6 +610,14 @@ public interface Namespaces {
     void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException;
 
     /**
+     * Revoke permissions on a subscription's admin-api access asynchronously.
+     * @param namespace
+     * @param subscription
+     * @param role
+     */
+    CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role);
+
+    /**
      * Get the replication clusters for a namespace.
      * <p>
      * Response example:
@@ -438,6 +641,20 @@ public interface Namespaces {
     List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the replication clusters for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>["use", "usw", "usc"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<List<String>> getNamespaceReplicationClustersAsync(String namespace);
+
+    /**
      * Set the replication clusters for a namespace.
      * <p>
      * Request example:
@@ -465,6 +682,22 @@ public interface Namespaces {
     void setNamespaceReplicationClusters(String namespace, Set<String> clusterIds) throws PulsarAdminException;
 
     /**
+     * Set the replication clusters for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>["us-west", "us-east", "us-cent"]</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param clusterIds
+     *            Pulsar Cluster Ids
+     */
+    CompletableFuture<Void> setNamespaceReplicationClustersAsync(String namespace, Set<String> clusterIds);
+
+    /**
      * Get the message TTL for a namespace.
      * <p>
      * Response example:
@@ -486,6 +719,20 @@ public interface Namespaces {
     int getNamespaceMessageTTL(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the message TTL for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getNamespaceMessageTTLAsync(String namespace);
+
+    /**
      * Set the messages Time to Live for all the topics within a namespace.
      * <p>
      * Request example:
@@ -508,9 +755,24 @@ public interface Namespaces {
      */
     void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException;
 
+    /**
+     * Set the messages Time to Live for all the topics within a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param ttlInSeconds
+     *            TTL values for all messages for all topics in this namespace
+     */
+    CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int ttlInSeconds);
 
     /**
-     * Set anti-affinity group name for a namespace
+     * Set anti-affinity group name for a namespace.
      * <p>
      * Request example:
      *
@@ -529,7 +791,19 @@ public interface Namespaces {
     void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException;
 
     /**
-     * Get all namespaces that grouped with given anti-affinity group
+     * Set anti-affinity group name for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * @param namespace
+     *            Namespace name
+     * @param namespaceAntiAffinityGroup
+     *            anti-affinity group name for a namespace
+     */
+    CompletableFuture<Void> setNamespaceAntiAffinityGroupAsync(String namespace, String namespaceAntiAffinityGroup);
+
+    /**
+     * Get all namespaces that grouped with given anti-affinity group.
      *
      * @param tenant
      *            tenant is only used for authorization. Client has to be admin of any of the tenant to access this
@@ -545,7 +819,21 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
-     * Get anti-affinity group name for a namespace
+     * Get all namespaces that grouped with given anti-affinity group asynchronously.
+     *
+     * @param tenant
+     *            tenant is only used for authorization. Client has to be admin of any of the tenant to access this
+     *            api api.
+     * @param cluster
+     *            cluster name
+     * @param namespaceAntiAffinityGroup
+     *            Anti-affinity group name
+     * @return list of namespace grouped under a given anti-affinity group
+     */
+    CompletableFuture<List<String>> getAntiAffinityNamespacesAsync(String tenant, String cluster, String namespaceAntiAffinityGroup);
+
+    /**
+     * Get anti-affinity group name for a namespace.
      * <p>
      * Response example:
      *
@@ -566,7 +854,21 @@ public interface Namespaces {
     String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
-     * Delete anti-affinity group name for a namespace.
+     * Get anti-affinity group name for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<String> getNamespaceAntiAffinityGroupAsync(String namespace);
+
+    /**
+     * Delete anti-affinity group name for a namespace.
      *
      * @param namespace
      *            Namespace name
@@ -581,6 +883,14 @@ public interface Namespaces {
     void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
+     * Delete anti-affinity group name for a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String namespace);
+
+    /**
      * Set the deduplication status for all topics within a namespace.
      * <p>
      * When deduplication is enabled, the broker will prevent to store the same message multiple times.
@@ -606,6 +916,24 @@ public interface Namespaces {
     void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException;
 
     /**
+     * Set the deduplication status for all topics within a namespace asynchronously.
+     * <p>
+     * When deduplication is enabled, the broker will prevent to store the same message multiple times.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>true</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param enableDeduplication
+     *            wether to enable or disable deduplication feature
+     */
+    CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication);
+
+    /**
      * Get the bundles split data.
      *
      * @param namespace
@@ -653,6 +981,33 @@ public interface Namespaces {
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException;
 
     /**
+     * Get backlog quota map on a namespace asynchronously.
+     * <p>
+     * Get backlog quota map on a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "namespace_memory" : {
+     *          "limit" : "134217728",
+     *          "policy" : "consumer_backlog_eviction"
+     *      },
+     *      "destination_storage" : {
+     *          "limit" : "-1",
+     *          "policy" : "producer_exception"
+     *      }
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> getBacklogQuotaMapAsync(String namespace);
+
+    /**
      * Set a backlog quota for all the topics on a namespace.
      * <p>
      * Set a backlog quota on a namespace.
@@ -685,6 +1040,31 @@ public interface Namespaces {
     void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException;
 
     /**
+     * Set a backlog quota for all the topics on a namespace asynchronously.
+     * <p>
+     * Set a backlog quota on a namespace.
+     * <p>
+     * The backlog quota can be set on this resource:
+     * <p>
+     * Request parameter example:
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "limit" : "134217728",
+     *     "policy" : "consumer_backlog_eviction"
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param backlogQuota
+     *            the new BacklogQuota
+     */
+    CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota);
+
+    /**
      * Remove a backlog quota policy from a namespace.
      * <p>
      * Remove a backlog quota policy from a namespace.
@@ -704,6 +1084,18 @@ public interface Namespaces {
     void removeBacklogQuota(String namespace) throws PulsarAdminException;
 
     /**
+     * Remove a backlog quota policy from a namespace asynchronously.
+     * <p>
+     * Remove a backlog quota policy from a namespace.
+     * <p>
+     * The backlog retention policy will fall back to the default.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);
+
+    /**
      * Set the persistence configuration for all the topics on a namespace.
      * <p>
      * Set the persistence configuration on a namespace.
@@ -723,10 +1115,6 @@ public interface Namespaces {
      * </code>
      * </pre>
      *
-     * @param tenant
-     *            Tenant name
-     * @param cluster
-     *            Cluster name
      * @param namespace
      *            Namespace name
      * @param persistence
@@ -744,6 +1132,33 @@ public interface Namespaces {
     void setPersistence(String namespace, PersistencePolicies persistence) throws PulsarAdminException;
 
     /**
+     * Set the persistence configuration for all the topics on a namespace asynchronously.
+     * <p>
+     * Set the persistence configuration on a namespace.
+     * <p>
+     * Request parameter example:
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "bookkeeperEnsemble" : 3,                 // Number of bookies to use for a topic
+     *     "bookkeeperWriteQuorum" : 2,              // How many writes to make of each entry
+     *     "bookkeeperAckQuorum" : 2,                // Number of acks (guaranteed copies) to wait for each entry
+     *     "managedLedgerMaxMarkDeleteRate" : 10.0,  // Throttling rate of mark-delete operation
+     *                                               // to avoid high number of updates for each
+     *                                               // consumer
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param persistence
+     *            Persistence policies object
+     */
+    CompletableFuture<Void> setPersistenceAsync(String namespace, PersistencePolicies persistence);
+
+    /**
      * Get the persistence configuration for a namespace.
      * <p>
      * Get the persistence configuration for a namespace.
@@ -763,10 +1178,6 @@ public interface Namespaces {
      * </code>
      * </pre>
      *
-     * @param tenant
-     *            Tenant name
-     * @param cluster
-     *            Cluster name
      * @param namespace
      *            Namespace name
      *
@@ -782,6 +1193,31 @@ public interface Namespaces {
     PersistencePolicies getPersistence(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the persistence configuration for a namespace asynchronously.
+     * <p>
+     * Get the persistence configuration for a namespace.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "bookkeeperEnsemble" : 3,                 // Number of bookies to use for a topic
+     *     "bookkeeperWriteQuorum" : 2,              // How many writes to make of each entry
+     *     "bookkeeperAckQuorum" : 2,                // Number of acks (guaranteed copies) to wait for each entry
+     *     "managedLedgerMaxMarkDeleteRate" : 10.0,  // Throttling rate of mark-delete operation
+     *                                               // to avoid high number of updates for each
+     *                                               // consumer
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<PersistencePolicies> getPersistenceAsync(String namespace);
+
+    /**
      * Set bookie affinity group for a namespace to isolate namespace write to bookies that are part of given affinity
      * group.
      * 
@@ -793,6 +1229,17 @@ public interface Namespaces {
      */
     void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup)
             throws PulsarAdminException;
+
+    /**
+     * Set bookie affinity group for a namespace to isolate namespace write to bookies that are part of given affinity
+     * group asynchronously.
+     *
+     * @param namespace
+     *            namespace name
+     * @param bookieAffinityGroup
+     *            bookie affinity group
+     */
+    CompletableFuture<Void> setBookieAffinityGroupAsync(String namespace, BookieAffinityGroupData bookieAffinityGroup);
     
     /**
      * Delete bookie affinity group configured for a namespace.
@@ -803,6 +1250,13 @@ public interface Namespaces {
     void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
+     * Delete bookie affinity group configured for a namespace asynchronously.
+     *
+     * @param namespace
+     */
+    CompletableFuture<Void> deleteBookieAffinityGroupAsync(String namespace);
+
+    /**
      * Get bookie affinity group configured for a namespace.
      * 
      * @param namespace
@@ -812,6 +1266,14 @@ public interface Namespaces {
     BookieAffinityGroupData getBookieAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
+     * Get bookie affinity group configured for a namespace asynchronously.
+     *
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<BookieAffinityGroupData> getBookieAffinityGroupAsync(String namespace);
+
+    /**
      * Set the retention configuration for all the topics on a namespace.
      * <p/>
      * Set the retention configuration on a namespace. This operation requires Pulsar super-user access.
@@ -843,6 +1305,28 @@ public interface Namespaces {
     void setRetention(String namespace, RetentionPolicies retention) throws PulsarAdminException;
 
     /**
+     * Set the retention configuration for all the topics on a namespace asynchronously.
+     * <p/>
+     * Set the retention configuration on a namespace. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention);
+
+    /**
      * Get the retention configuration for a namespace.
      * <p/>
      * Get the retention configuration for a namespace.
@@ -873,6 +1357,28 @@ public interface Namespaces {
     RetentionPolicies getRetention(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the retention configuration for a namespace asynchronously.
+     * <p/>
+     * Get the retention configuration for a namespace.
+     * <p/>
+     * Response example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<RetentionPolicies> getRetentionAsync(String namespace);
+
+    /**
      * Unload a namespace from the current serving broker.
      *
      * @param namespace
@@ -890,7 +1396,15 @@ public interface Namespaces {
     void unload(String namespace) throws PulsarAdminException;
 
     /**
-     * Get the replication configuration version for a given namespace
+     * Unload a namespace from the current serving broker asynchronously.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> unloadAsync(String namespace);
+
+    /**
+     * Get the replication configuration version for a given namespace.
      *
      * @param namespace
      * @return Replication configuration version
@@ -900,6 +1414,14 @@ public interface Namespaces {
     String getReplicationConfigVersion(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the replication configuration version for a given namespace asynchronously.
+     *
+     * @param namespace
+     * @return Replication configuration version
+     */
+    CompletableFuture<String> getReplicationConfigVersionAsync(String namespace);
+
+    /**
      * Unload namespace bundle
      *
      * @param namespace
@@ -922,18 +1444,28 @@ public interface Namespaces {
     CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle);
 
     /**
-     * Split namespace bundle
+     * Split namespace bundle.
      *
      * @param namespace
-     * @param range of bundle to split
-     * @param unload newly split bundles from the broker
+     * @param bundle range of bundle to split
+     * @param unloadSplitBundles
+     * @param splitAlgorithmName
      * @throws PulsarAdminException
-     *             Unexpected error
      */
     void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException;
 
     /**
-     * Set message-publish-rate (topics under this namespace can publish this many messages per second)
+     * Split namespace bundle asynchronously.
+     *
+     * @param namespace
+     * @param bundle range of bundle to split
+     * @param unloadSplitBundles
+     * @param splitAlgorithmName
+     */
+    CompletableFuture<Void> splitNamespaceBundleAsync(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName);
+
+    /**
+     * Set message-publish-rate (topics under this namespace can publish this many messages per second).
      *
      * @param namespace
      * @param publishMsgRate
@@ -943,17 +1475,33 @@ public interface Namespaces {
      */
     void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException;
 
-    /** Get message-publish-rate (topics under this namespace can publish this many messages per second)
-    *
-    * @param namespace
-    * @returns messageRate
-    *            number of messages per second
-    * @throws PulsarAdminException
-    *             Unexpected error
-    */
+    /**
+     * Set message-publish-rate (topics under this namespace can publish this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @param publishMsgRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setPublishRateAsync(String namespace, PublishRate publishMsgRate);
+
+    /**
+     * Get message-publish-rate (topics under this namespace can publish this many messages per second)
+     *
+     * @param namespace
+     * @return number of messages per second
+     * @throws PulsarAdminException Unexpected error
+     */
     PublishRate getPublishRate(String namespace) throws PulsarAdminException;
 
     /**
+     * Get message-publish-rate (topics under this namespace can publish this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @return number of messages per second
+     */
+    CompletableFuture<PublishRate> getPublishRateAsync(String namespace);
+
+    /**
      * Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
      *
      * @param namespace
@@ -964,17 +1512,36 @@ public interface Namespaces {
      */
     void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
 
-    /** Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
-    *
-    * @param namespace
-    * @returns messageRate
-    *            number of messages per second
-    * @throws PulsarAdminException
-    *             Unexpected error
-    */
+    /**
+     * Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+    /**
+     * Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
+     *
+     * @param namespace
+     * @returns messageRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
     DispatchRate getDispatchRate(String namespace) throws PulsarAdminException;
 
     /**
+     * Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @returns messageRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getDispatchRateAsync(String namespace);
+
+    /**
      * Set namespace-subscribe-rate (topics under this namespace will limit by subscribeRate)
      *
      * @param namespace
@@ -985,7 +1552,17 @@ public interface Namespaces {
      */
     void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException;
 
-    /** Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period)
+    /**
+     * Set namespace-subscribe-rate (topics under this namespace will limit by subscribeRate) asynchronously.
+     *
+     * @param namespace
+     * @param subscribeRate
+     *            consumer subscribe limit by this subscribeRate
+     */
+    CompletableFuture<Void> setSubscribeRateAsync(String namespace, SubscribeRate subscribeRate);
+
+    /**
+     * Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period)
      *
      * @param namespace
      * @returns subscribeRate
@@ -995,6 +1572,14 @@ public interface Namespaces {
     SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException;
 
     /**
+     * Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period) asynchronously.
+     *
+     * @param namespace
+     * @returns subscribeRate
+     */
+    CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace);
+
+    /**
      * Set subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
      *
      * @param namespace
@@ -1005,7 +1590,17 @@ public interface Namespaces {
      */
     void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
 
-    /** Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
+    /**
+     * Set subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setSubscriptionDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+    /**
+     * Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
      *
      * @param namespace
      * @returns DispatchRate
@@ -1016,6 +1611,15 @@ public interface Namespaces {
     DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException;
 
     /**
+     * Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @returns DispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace);
+
+    /**
      * Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
      *
      * @param namespace
@@ -1026,7 +1630,17 @@ public interface Namespaces {
      */
     void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
 
-    /** Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
+    /**
+     * Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @param dispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+    /**
+     * Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
      *
      * @param namespace
      * @returns DispatchRate
@@ -1037,7 +1651,16 @@ public interface Namespaces {
     DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException;
 
     /**
-     * Clear backlog for all topics on a namespace
+     * Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) asynchronously.
+     *
+     * @param namespace
+     * @returns DispatchRate
+     *            number of messages per second
+     */
+    CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String namespace);
+
+    /**
+     * Clear backlog for all topics on a namespace.
      *
      * @param namespace
      * @throws PulsarAdminException
@@ -1046,6 +1669,13 @@ public interface Namespaces {
     void clearNamespaceBacklog(String namespace) throws PulsarAdminException;
 
     /**
+     * Clear backlog for all topics on a namespace asynchronously.
+     *
+     * @param namespace
+     */
+    CompletableFuture<Void> clearNamespaceBacklogAsync(String namespace);
+
+    /**
      * Clear backlog for a given subscription on all topics on a namespace
      *
      * @param namespace
@@ -1056,6 +1686,14 @@ public interface Namespaces {
     void clearNamespaceBacklogForSubscription(String namespace, String subscription) throws PulsarAdminException;
 
     /**
+     * Clear backlog for a given subscription on all topics on a namespace asynchronously.
+     *
+     * @param namespace
+     * @param subscription
+     */
+    CompletableFuture<Void> clearNamespaceBacklogForSubscriptionAsync(String namespace, String subscription);
+
+    /**
      * Clear backlog for all topics on a namespace bundle
      *
      * @param namespace
@@ -1100,7 +1738,7 @@ public interface Namespaces {
             String subscription);
 
     /**
-     * Unsubscribes the given subscription on all topics on a namespace
+     * Unsubscribe the given subscription on all topics on a namespace.
      *
      * @param namespace
      * @param subscription
@@ -1109,7 +1747,15 @@ public interface Namespaces {
     void unsubscribeNamespace(String namespace, String subscription) throws PulsarAdminException;
 
     /**
-     * Unsubscribes the given subscription on all topics on a namespace bundle
+     * Unsubscribe the given subscription on all topics on a namespace asynchronously.
+     *
+     * @param namespace
+     * @param subscription
+     */
+    CompletableFuture<Void> unsubscribeNamespaceAsync(String namespace, String subscription);
+
+    /**
+     * Unsubscribe the given subscription on all topics on a namespace bundle
      *
      * @param namespace
      * @param bundle
@@ -1119,7 +1765,7 @@ public interface Namespaces {
     void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription) throws PulsarAdminException;
 
     /**
-     * Unsubscribes the given subscription on all topics on a namespace bundle asynchronously
+     * Unsubscribe the given subscription on all topics on a namespace bundle asynchronously
      *
      * @param namespace
      * @param bundle
@@ -1155,6 +1801,24 @@ public interface Namespaces {
     void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException;
 
     /**
+     * Set the encryption required status for all topics within a namespace asynchronously.
+     * <p>
+     * When encryption required is true, the broker will prevent to store unencrypted messages.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>true</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param encryptionRequired
+     *            whether message encryption is required or not
+     */
+    CompletableFuture<Void> setEncryptionRequiredStatusAsync(String namespace, boolean encryptionRequired);
+
+    /**
      * Get the delayed delivery messages for all topics within a namespace.
      * <p>
      * If disabled, messages will be immediately delivered and there will
@@ -1186,6 +1850,30 @@ public interface Namespaces {
     DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the delayed delivery messages for all topics within a namespace asynchronously.
+     * <p>
+     * If disabled, messages will be immediately delivered and there will
+     * be no tracking overhead.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "active" : true,   // Enable or disable delayed delivery for messages on a namespace
+     *     "tickTime" : 1000, // The tick time for when retrying on delayed delivery messages
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @return delayedDeliveryPolicies
+     *            Whether to enable the delayed delivery for messages.
+     */
+    CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryAsync(String namespace);
+
+    /**
      * Set the delayed delivery messages for all topics within a namespace.
      * <p>
      * If disabled, messages will be immediately delivered and there will
@@ -1217,7 +1905,31 @@ public interface Namespaces {
     void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException;
 
     /**
-     * Set the given subscription auth mode on all topics on a namespace
+     * Set the delayed delivery messages for all topics within a namespace asynchronously.
+     * <p>
+     * If disabled, messages will be immediately delivered and there will
+     * be no tracking overhead.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "tickTime" : 1000, // Enable or disable delayed delivery for messages on a namespace
+     *     "active" : true,   // The tick time for when retrying on delayed delivery messages
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param delayedDeliveryPolicies
+     *            Whether to enable the delayed delivery for messages.
+     */
+    CompletableFuture<Void> setDelayedDeliveryMessagesAsync(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
+
+    /**
+     * Set the given subscription auth mode on all topics on a namespace.
      *
      * @param namespace
      * @param subscriptionAuthMode
@@ -1226,6 +1938,14 @@ public interface Namespaces {
     void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
 
     /**
+     * Set the given subscription auth mode on all topics on a namespace asynchronously.
+     *
+     * @param namespace
+     * @param subscriptionAuthMode
+     */
+    CompletableFuture<Void> setSubscriptionAuthModeAsync(String namespace, SubscriptionAuthMode subscriptionAuthMode);
+
+    /**
      * Get the maxProducersPerTopic for a namespace.
      * <p>
      * Response example:
@@ -1247,6 +1967,20 @@ public interface Namespaces {
     int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the maxProducersPerTopic for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getMaxProducersPerTopicAsync(String namespace);
+
+    /**
      * Set maxProducersPerTopic for a namespace.
      * <p>
      * Request example:
@@ -1270,6 +2004,22 @@ public interface Namespaces {
     void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException;
 
     /**
+     * Set maxProducersPerTopic for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxProducersPerTopic
+     *            maxProducersPerTopic value for a namespace
+     */
+    CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic);
+
+    /**
      * Get the maxProducersPerTopic for a namespace.
      * <p>
      * Response example:
@@ -1291,6 +2041,20 @@ public interface Namespaces {
     int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the maxProducersPerTopic for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getMaxConsumersPerTopicAsync(String namespace);
+
+    /**
      * Set maxConsumersPerTopic for a namespace.
      * <p>
      * Request example:
@@ -1314,6 +2078,22 @@ public interface Namespaces {
     void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException;
 
     /**
+     * Set maxConsumersPerTopic for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerTopic
+     *            maxConsumersPerTopic value for a namespace
+     */
+    CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, int maxConsumersPerTopic);
+
+    /**
      * Get the maxConsumersPerSubscription for a namespace.
      * <p>
      * Response example:
@@ -1332,10 +2112,47 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the maxConsumersPerSubscription for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String namespace);
+
+    /**
+     * Set maxConsumersPerSubscription for a namespace.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxConsumersPerSubscription
+     *            maxConsumersPerSubscription value for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
 
     /**
-     * Set maxConsumersPerSubscription for a namespace.
+     * Set maxConsumersPerSubscription for a namespace asynchronously.
      * <p>
      * Request example:
      *
@@ -1347,15 +2164,8 @@ public interface Namespaces {
      *            Namespace name
      * @param maxConsumersPerSubscription
      *            maxConsumersPerSubscription value for a namespace
-     *
-     * @throws NotAuthorizedException
-     *             Don't have admin permission
-     * @throws NotFoundException
-     *             Namespace does not exist
-     * @throws PulsarAdminException
-     *             Unexpected error
      */
-    void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
+    CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription);
 
     /**
      * Get the maxUnackedMessagesPerConsumer for a namespace.
@@ -1379,6 +2189,20 @@ public interface Namespaces {
     int getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the maxUnackedMessagesPerConsumer for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getMaxUnackedMessagesPerConsumerAsync(String namespace);
+
+    /**
      * Set maxUnackedMessagesPerConsumer for a namespace.
      * <p>
      * Request example:
@@ -1402,6 +2226,22 @@ public interface Namespaces {
     void setMaxUnackedMessagesPerConsumer(String namespace, int maxUnackedMessagesPerConsumer) throws PulsarAdminException;
 
     /**
+     * Set maxUnackedMessagesPerConsumer for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxUnackedMessagesPerConsumer
+     *            maxUnackedMessagesPerConsumer value for a namespace
+     */
+    CompletableFuture<Void> setMaxUnackedMessagesPerConsumerAsync(String namespace, int maxUnackedMessagesPerConsumer);
+
+    /**
      * Get the maxUnackedMessagesPerSubscription for a namespace.
      * <p>
      * Response example:
@@ -1423,6 +2263,20 @@ public interface Namespaces {
     int getMaxUnackedMessagesPerSubscription(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the maxUnackedMessagesPerSubscription for a namespace asynchronously.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>0</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Integer> getMaxUnackedMessagesPerSubscriptionAsync(String namespace);
+
+    /**
      * Set maxUnackedMessagesPerSubscription for a namespace.
      * <p>
      * Request example:
@@ -1446,6 +2300,22 @@ public interface Namespaces {
     void setMaxUnackedMessagesPerSubscription(String namespace, int maxUnackedMessagesPerSubscription) throws PulsarAdminException;
 
     /**
+     * Set maxUnackedMessagesPerSubscription for a namespace asynchronously.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param maxUnackedMessagesPerSubscription
+     *            Max number of unacknowledged messages allowed per shared subscription.
+     */
+    CompletableFuture<Void> setMaxUnackedMessagesPerSubscriptionAsync(String namespace, int maxUnackedMessagesPerSubscription);
+
+    /**
      * Get the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace
      * can have before compaction is triggered. 0 disables.
      * <p>
@@ -1468,6 +2338,21 @@ public interface Namespaces {
     long getCompactionThreshold(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the compactionThreshold for a namespace asynchronously. The maximum number of bytes topics in the namespace
+     * can have before compaction is triggered. 0 disables.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Long> getCompactionThresholdAsync(String namespace);
+
+    /**
      * Set the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace
      * can have before compaction is triggered. 0 disables.
      * <p>
@@ -1492,6 +2377,23 @@ public interface Namespaces {
     void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
 
     /**
+     * Set the compactionThreshold for a namespace asynchronously. The maximum number of bytes topics in the namespace
+     * can have before compaction is triggered. 0 disables.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param compactionThreshold
+     *            maximum number of backlog bytes before compaction is triggered
+     */
+    CompletableFuture<Void> setCompactionThresholdAsync(String namespace, long compactionThreshold);
+
+    /**
      * Get the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
      * in the namespace before data starts being offloaded to longterm storage.
      *
@@ -1515,6 +2417,22 @@ public interface Namespaces {
     long getOffloadThreshold(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the offloadThreshold for a namespace asynchronously. The maximum number of bytes stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Long> getOffloadThresholdAsync(String namespace);
+
+    /**
      * Set the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
      * in the namespace before data starts being offloaded to longterm storage.
      *
@@ -1538,7 +2456,26 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
+    void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException;
+
+    /**
+     * Set the offloadThreshold for a namespace asynchronously. The maximum number of bytes stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param offloadThreshold
+     *            maximum number of bytes stored before offloading is triggered
+     */
+    CompletableFuture<Void> setOffloadThresholdAsync(String namespace, long offloadThreshold);
 
     /**
      * Get the offload deletion lag for a namespace, in milliseconds.
@@ -1572,6 +2509,30 @@ public interface Namespaces {
     Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
 
     /**
+     * Get the offload deletion lag asynchronously for a namespace, in milliseconds.
+     * The number of milliseconds to wait before deleting a ledger segment which has been offloaded from
+     * the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * If the offload deletion lag has not been set for the namespace, the method returns 'null'
+     * and the namespace will use the configured default of the pulsar broker.
+     *
+     * A negative value disables deletion of the local ledger completely, though it will still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>3600000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the offload deletion lag for the namespace in milliseconds, or null if not set
+     */
+    CompletableFuture<Long> getOffloadDeleteLagMsAsync(String namespace);
+
+    /**
      * Set the offload deletion lag for a namespace.
      *
      * The offload deletion lag is the amount of time to wait after offloading a ledger segment to long term storage,
@@ -1595,6 +2556,22 @@ public interface Namespaces {
     void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException;
 
     /**
+     * Set the offload deletion lag for a namespace asynchronously.
+     *
+     * The offload deletion lag is the amount of time to wait after offloading a ledger segment to long term storage,
+     * before deleting its copy stored on the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * A negative value disables deletion of the local ledger completely, though it will still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded copy.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param lag the duration to wait before deleting the local copy
+     * @param unit the timeunit of the duration
+     */
+    CompletableFuture<Void> setOffloadDeleteLagAsync(String namespace, long lag, TimeUnit unit);
+
+    /**
      * Clear the offload deletion lag for a namespace.
      *
      * The namespace will fall back to using the configured default of the pulsar broker.
@@ -1609,6 +2586,13 @@ public interface Namespaces {
     void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
 
     /**
+     * Clear the offload deletion lag for a namespace asynchronously.
+     *
+     * The namespace will fall back to using the configured default of the pulsar broker.
+     */
+    CompletableFuture<Void> clearOffloadDeleteLagAsync(String namespace);
+
+    /**
      * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
      * before it is installed.
      *
@@ -1662,6 +2646,14 @@ public interface Namespaces {
      */
     boolean getSchemaValidationEnforced(String namespace)
             throws PulsarAdminException;
+
+    /**
+     * Get schema validation enforced for namespace asynchronously.
+     *
+     * @return the schema validation enforced flag
+     */
+    CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace);
+
     /**
      * Set schema validation enforced for namespace.
      * if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
@@ -1681,6 +2673,17 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
+     * Set schema validation enforced for namespace asynchronously.
+     * if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
+     * producer will be failed to connect. PLEASE be carefully on using this, since non-java clients don't
+     * support schema. if you enable this setting, it will cause non-java clients failed to produce.
+     *
+     * @param namespace pulsar namespace name
+     * @param schemaValidationEnforced flag to enable or disable schema validation for the given namespace
+     */
+    CompletableFuture<Void> setSchemaValidationEnforcedAsync(String namespace, boolean schemaValidationEnforced);
+
+    /**
      * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
      * before it is installed.
      *
@@ -1694,7 +2697,16 @@ public interface Namespaces {
      *             Unexpected error
      */
     SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace)
-            throws PulsarAdminException;;
+            throws PulsarAdminException;
+
+    /**
+     * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed asynchronously.
+     *
+     * @param namespace The namespace in whose policy we are interested
+     * @return the strategy used to check compatibility
+     */
+    CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync(String namespace);
 
     /**
      * Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
@@ -1714,7 +2726,17 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
-     * Get whether allow auto update schema
+     * Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
+     * before it is installed asynchronously.
+     *
+     * @param namespace The namespace in whose policy should be set
+     * @param strategy The schema compatibility strategy
+     */
+    CompletableFuture<Void> setSchemaCompatibilityStrategyAsync(String namespace,
+                                        SchemaCompatibilityStrategy strategy);
+
+    /**
+     * Get whether allow auto update schema.
      *
      * @param namespace pulsar namespace name
      * @return the schema validation enforced flag
@@ -1729,6 +2751,14 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
+     * Get whether allow auto update schema asynchronously.
+     *
+     * @param namespace pulsar namespace name
+     * @return the schema validation enforced flag
+     */
+    CompletableFuture<Boolean> getIsAllowAutoUpdateSchemaAsync(String namespace);
+
+    /**
      * The flag is when producer bring a new schema and the schema pass compatibility check
      * whether allow schema auto registered
      *
@@ -1745,6 +2775,15 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
+     * The flag is when producer bring a new schema and the schema pass compatibility check
+     * whether allow schema auto registered
+     *
+     * @param namespace pulsar namespace name
+     * @param isAllowAutoUpdateSchema flag to enable or disable auto update schema
+     */
+    CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, boolean isAllowAutoUpdateSchema);
+
+    /**
      * Set the offload configuration for all the topics in a namespace.
      * <p/>
      * Set the offload configuration in a namespace. This operation requires pulsar tenant access.
@@ -1781,6 +2820,33 @@ public interface Namespaces {
     void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException;
 
     /**
+     * Set the offload configuration for all the topics in a namespace asynchronously.
+     * <p/>
+     * Set the offload configuration in a namespace. This operation requires pulsar tenant access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "region" : "us-east-2",                   // The long term storage region
+     *     "bucket" : "bucket",                      // Bucket to place offloaded ledger into
+     *     "endpoint" : "endpoint",                  // Alternative endpoint to connect to
+     *     "maxBlockSize" : 1024,                    // Max Block Size, default 64MB
+     *     "readBufferSize" : 1024,                  // Read Buffer Size, default 1MB
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param offloadPolicies
+     *            Offload configuration
+     */
+    CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies);
+
+    /**
      * Get the offload configuration for a namespace.
      * <p/>
      * Get the offload configuration for a namespace.
@@ -1813,4 +2879,28 @@ public interface Namespaces {
      */
     OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException;
 
+    /**
+     * Get the offload configuration for a namespace asynchronously.
+     * <p/>
+     * Get the offload configuration for a namespace.
+     * <p/>
+     * Response example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "region" : "us-east-2",                   // The long term storage region
+     *     "bucket" : "bucket",                      // Bucket to place offloaded ledger into
+     *     "endpoint" : "endpoint",                  // Alternative endpoint to connect to
+     *     "maxBlockSize" : 1024,                    // Max Block Size, default 64MB
+     *     "readBufferSize" : 1024,                  // Read Buffer Size, default 1MB
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String namespace);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 3b60fa2..7c5b79a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -26,8 +26,10 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -68,68 +70,164 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     @Override
     public List<String> getNamespaces(String tenant) throws PulsarAdminException {
         try {
-            WebTarget path = adminV2Namespaces.path(tenant);
-            return request(path).get(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespacesAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getNamespacesAsync(String tenant) {
+        WebTarget path = adminV2Namespaces.path(tenant);
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> namespaces) {
+                        future.complete(namespaces);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public List<String> getNamespaces(String tenant, String cluster) throws PulsarAdminException {
-        try {
-            WebTarget path = adminNamespaces.path(tenant).path(cluster);
-            return request(path).get(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+        WebTarget path = adminNamespaces.path(tenant).path(cluster);
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+
+                    @Override
+                    public void completed(List<String> namespaces) {
+                        future.complete(namespaces);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        try {
+            return future.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
     public List<String> getTopics(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            String action = ns.isV2() ? "topics" : "destinations";
-            WebTarget path = namespacePath(ns, action);
-            return request(path).get(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getTopicsAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getTopicsAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        String action = ns.isV2() ? "topics" : "destinations";
+        WebTarget path = namespacePath(ns, action);
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> topics) {
+                        future.complete(topics);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public Policies getPolicies(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns);
-            return request(path).get(Policies.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getPoliciesAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Policies> getPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns);
+        final CompletableFuture<Policies> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Policies>() {
+                    @Override
+                    public void completed(Policies policies) {
+                        future.complete(policies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns);
-
-            if (ns.isV2()) {
-                // For V2 API we pass full Policy class instance
-                Policies policies = new Policies();
-                policies.replication_clusters = clusters;
-                request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
-            } else {
-                // For V1 API, we pass the BundlesData on creation
-                request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+            createNamespaceAsync(namespace, clusters).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createNamespaceAsync(String namespace, Set<String> clusters) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns);
+
+        if (ns.isV2()) {
+            // For V2 API we pass full Policy class instance
+            Policies policies = new Policies();
+            policies.replication_clusters = clusters;
+            return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+        } else {
+            // For V1 API, we pass the BundlesData on creation
+            return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)).thenAccept(ignore -> {
                 // For V1, we need to do it in 2 steps
-                setNamespaceReplicationClusters(namespace, clusters);
-            }
-        } catch (Exception e) {
-            throw getApiException(e);
+                setNamespaceReplicationClustersAsync(namespace, clusters);
+            });
         }
     }
 
@@ -139,71 +237,116 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public CompletableFuture<Void> createNamespaceAsync(String namespace, int numBundles) {
+        return createNamespaceAsync(namespace, new BundlesData(numBundles));
+    }
+
+    @Override
     public void createNamespace(String namespace, Policies policies) throws PulsarAdminException {
+        try {
+            createNamespaceAsync(namespace, policies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies) {
         NamespaceName ns = NamespaceName.get(namespace);
         checkArgument(ns.isV2(), "Create namespace with policies is only supported on newer namespaces");
+        WebTarget path = namespacePath(ns);
+        // For V2 API we pass full Policy class instance
+        return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+    }
 
+    @Override
+    public void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException {
         try {
-            WebTarget path = namespacePath(ns);
-
-            // For V2 API we pass full Policy class instance
-            request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            createNamespaceAsync(namespace, bundlesData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns);
-
-            if (ns.isV2()) {
-                // For V2 API we pass full Policy class instance
-                Policies policies = new Policies();
-                policies.bundles = bundlesData;
-                request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
-            } else {
-                // For V1 API, we pass the BundlesData on creation
-                request(path).put(Entity.entity(bundlesData, MediaType.APPLICATION_JSON), ErrorData.class);
-            }
-        } catch (Exception e) {
-            throw getApiException(e);
+    public CompletableFuture<Void> createNamespaceAsync(String namespace, BundlesData bundlesData) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns);
+
+        if (ns.isV2()) {
+            // For V2 API we pass full Policy class instance
+            Policies policies = new Policies();
+            policies.bundles = bundlesData;
+            return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+        } else {
+            // For V1 API, we pass the BundlesData on creation
+            return asyncPutRequest(path, Entity.entity(bundlesData, MediaType.APPLICATION_JSON));
         }
     }
 
     @Override
     public void createNamespace(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns);
-            request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            createNamespaceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> createNamespaceAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns);
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void deleteNamespace(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns);
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteNamespaceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteNamespaceAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns);
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException {
         try {
-            deleteNamespaceBundleAsync(namespace, bundleRange).get();
+            deleteNamespaceBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -217,473 +360,1079 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     @Override
     public Map<String, Set<AuthAction>> getPermissions(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "permissions");
-            return request(path).get(new GenericType<Map<String, Set<AuthAction>>>() {});
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getPermissionsAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "permissions");
+        final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<String, Set<AuthAction>>>() {
+                    @Override
+                    public void completed(Map<String, Set<AuthAction>> permissions) {
+                        future.complete(permissions);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void grantPermissionOnNamespace(String namespace, String role, Set<AuthAction> actions)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "permissions", role);
-            request(path).post(Entity.entity(actions, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            grantPermissionOnNamespaceAsync(namespace, role, actions)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace, String role, Set<AuthAction> actions) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "permissions", role);
+        return asyncPostRequest(path, Entity.entity(actions, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void revokePermissionsOnNamespace(String namespace, String role) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "permissions", role);
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            revokePermissionsOnNamespaceAsync(namespace, role).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Void> revokePermissionsOnNamespaceAsync(String namespace, String role) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "permissions", role);
+        return asyncDeleteRequest(path);
+    }
+
 
     @Override
     public void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "permissions", "subscription", subscription);
-            request(path).post(Entity.entity(roles, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            grantPermissionOnSubscriptionAsync(namespace, subscription, roles)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> grantPermissionOnSubscriptionAsync(String namespace, String subscription, Set<String> roles) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "permissions", "subscription", subscription);
+        return asyncPostRequest(path, Entity.entity(roles, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "permissions", subscription, role);
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            revokePermissionOnSubscriptionAsync(namespace, subscription, role)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "permissions", subscription, role);
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "replication");
-            return request(path).get(new GenericType<List<String>>() {});
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceReplicationClustersAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getNamespaceReplicationClustersAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "replication");
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> clusters) {
+                        future.complete(clusters);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setNamespaceReplicationClusters(String namespace, Set<String> clusterIds) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "replication");
-            request(path).post(Entity.entity(clusterIds, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setNamespaceReplicationClustersAsync(namespace, clusterIds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setNamespaceReplicationClustersAsync(String namespace, Set<String> clusterIds) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "replication");
+        return asyncPostRequest(path, Entity.entity(clusterIds, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getNamespaceMessageTTL(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "messageTTL");
-            return request(path).get(new GenericType<Integer>() {});
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceMessageTTLAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Integer> getNamespaceMessageTTLAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "messageTTL");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer ttl) {
+                        future.complete(ttl);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "messageTTL");
-            request(path).post(Entity.entity(ttlInSeconds, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setNamespaceMessageTTLAsync(namespace, ttlInSeconds)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int ttlInSeconds) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "messageTTL");
+        return asyncPostRequest(path, Entity.entity(ttlInSeconds, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "antiAffinity");
-            request(path).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setNamespaceAntiAffinityGroupAsync(namespace, namespaceAntiAffinityGroup)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setNamespaceAntiAffinityGroupAsync(String namespace, String namespaceAntiAffinityGroup) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "antiAffinity");
+        return asyncPostRequest(path, Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "antiAffinity");
-            return request(path).get(new GenericType<String>() {});
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceAntiAffinityGroupAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<String> getNamespaceAntiAffinityGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "antiAffinity");
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String s) {
+                        future.complete(s);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public List<String> getAntiAffinityNamespaces(String tenant, String cluster, String namespaceAntiAffinityGroup)
             throws PulsarAdminException {
         try {
-            WebTarget path = adminNamespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup);
-            return request(path.queryParam("property", tenant)).get(new GenericType<List<String>>() {});
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getAntiAffinityNamespacesAsync(tenant, cluster, namespaceAntiAffinityGroup)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getAntiAffinityNamespacesAsync(String tenant, String cluster, String namespaceAntiAffinityGroup) {
+        WebTarget path = adminNamespaces.path(cluster)
+                .path("antiAffinity").path(namespaceAntiAffinityGroup).queryParam("property", tenant);
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> antiNamespaces) {
+                        future.complete(antiNamespaces);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "antiAffinity");
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteNamespaceAntiAffinityGroupAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "antiAffinity");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "deduplication");
-            request(path).post(Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setDeduplicationStatusAsync(namespace, enableDeduplication)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "deduplication");
+        return asyncPostRequest(path, Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "backlogQuotaMap");
-            return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getBacklogQuotaMapAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> getBacklogQuotaMapAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "backlogQuotaMap");
+        final CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<BacklogQuotaType, BacklogQuota>>() {
+                    @Override
+                    public void completed(Map<BacklogQuotaType, BacklogQuota> quotaMap) {
+                        future.complete(quotaMap);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "backlogQuota");
-            request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setBacklogQuotaAsync(namespace, backlogQuota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "backlogQuota");
+        return asyncPostRequest(path, Entity.entity(backlogQuota, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void removeBacklogQuota(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "backlogQuota");
-            request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
-                    .delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            removeBacklogQuotaAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "backlogQuota")
+                .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString());
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void setPersistence(String namespace, PersistencePolicies persistence) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "persistence");
-            request(path).post(Entity.entity(persistence, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setPersistenceAsync(namespace, persistence).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setPersistenceAsync(String namespace, PersistencePolicies persistence) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence");
+        return asyncPostRequest(path, Entity.entity(persistence, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
-            request(path).post(Entity.entity(bookieAffinityGroup, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setBookieAffinityGroupAsync(namespace, bookieAffinityGroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setBookieAffinityGroupAsync(String namespace, BookieAffinityGroupData bookieAffinityGroup) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+        return asyncPostRequest(path, Entity.entity(bookieAffinityGroup, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteBookieAffinityGroupAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteBookieAffinityGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public BookieAffinityGroupData getBookieAffinityGroup(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
-            return request(path).get(BookieAffinityGroupData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getBookieAffinityGroupAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<BookieAffinityGroupData> getBookieAffinityGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+        final CompletableFuture<BookieAffinityGroupData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<BookieAffinityGroupData>() {
+                    @Override
+                    public void completed(BookieAffinityGroupData bookieAffinityGroupData) {
+                        future.complete(bookieAffinityGroupData);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public PersistencePolicies getPersistence(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "persistence");
-            return request(path).get(PersistencePolicies.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getPersistenceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<PersistencePolicies> getPersistenceAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "persistence");
+        final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<PersistencePolicies>() {
+                    @Override
+                    public void completed(PersistencePolicies persistencePolicies) {
+                        future.complete(persistencePolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setRetention(String namespace, RetentionPolicies retention) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "retention");
-            request(path).post(Entity.entity(retention, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setRetentionAsync(namespace, retention).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
+    }
 
+    @Override
+    public CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "retention");
+        return asyncPostRequest(path, Entity.entity(retention, MediaType.APPLICATION_JSON));
     }
 
     @Override
     public RetentionPolicies getRetention(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "retention");
-            return request(path).get(RetentionPolicies.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getRetentionAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<RetentionPolicies> getRetentionAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "retention");
+        final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<RetentionPolicies>() {
+                    @Override
+                    public void completed(RetentionPolicies retentionPolicies) {
+                        future.complete(retentionPolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void unload(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "unload");
-            request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            unloadAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> unloadAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "unload");
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public String getReplicationConfigVersion(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "configversion");
-            return request(path).get(String.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getReplicationConfigVersionAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<String> getReplicationConfigVersionAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "configversion");
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String s) {
+                        future.complete(s);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException {
+        try {
+            unloadNamespaceBundleAsync(namespace, bundle).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException {
+    public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "unload");
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)
+            throws PulsarAdminException {
+        try {
+            splitNamespaceBundleAsync(namespace, bundle, unloadSplitBundles, splitAlgorithmName)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> splitNamespaceBundleAsync(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "split")
+                .queryParam("unload", Boolean.toString(unloadSplitBundles))
+                .queryParam("splitAlgorithmName", splitAlgorithmName);
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException {
+        try {
+            setPublishRateAsync(namespace, publishMsgRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setPublishRateAsync(String namespace, PublishRate publishMsgRate) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "publishRate");
+        return asyncPostRequest(path, Entity.entity(publishMsgRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public PublishRate getPublishRate(String namespace) throws PulsarAdminException {
+        try {
+            return getPublishRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<PublishRate> getPublishRateAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "publishRate");
+        final CompletableFuture<PublishRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<PublishRate>() {
+                    @Override
+                    public void completed(PublishRate publishRate) {
+                        future.complete(publishRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
+        try {
+            setDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "dispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException {
+        try {
+            return getDispatchRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<DispatchRate> getDispatchRateAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "dispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException {
         try {
-            unloadNamespaceBundleAsync(namespace, bundle).get();
+            setSubscribeRateAsync(namespace, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
+    public CompletableFuture<Void> setSubscribeRateAsync(String namespace, SubscribeRate subscribeRate) {
         NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, bundle, "unload");
-        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+        WebTarget path = namespacePath(ns, "subscribeRate");
+        return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
     }
 
     @Override
-    public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)
-            throws PulsarAdminException {
+    public SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle, "split");
-            request(path.queryParam("unload", Boolean.toString(unloadSplitBundles))
-                    .queryParam("splitAlgorithmName", splitAlgorithmName))
-                    .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getSubscribeRateAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException {
+    public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscribeRate");
+        final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<SubscribeRate>() {
+                    @Override
+                    public void completed(SubscribeRate subscribeRate) {
+                        future.complete(subscribeRate);
+                    }
 
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "publishRate");
-            request(path).post(Entity.entity(publishMsgRate, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
-    
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
     }
 
     @Override
-    public PublishRate getPublishRate(String namespace) throws PulsarAdminException {
+    public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "publishRate");
-            return request(path).get(PublishRate.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setSubscriptionDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
-    
+
     @Override
-    public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "dispatchRate");
-            request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
     }
 
     @Override
-    public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException {
+    public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "dispatchRate");
-            return request(path).get(DispatchRate.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getSubscriptionDispatchRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "subscribeRate");
-            request(path).post(Entity.entity(subscribeRate, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
     }
 
     @Override
-    public SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException {
+    public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "subscribeRate");
-            return request(path).get(SubscribeRate.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setReplicatorDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
-            request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+        return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
     }
 
     @Override
-    public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
+    public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
-            return request(path).get(DispatchRate.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getReplicatorDispatchRateAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "replicatorDispatchRate");
-            request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DispatchRate>() {
+                    @Override
+                    public void completed(DispatchRate dispatchRate) {
+                        future.complete(dispatchRate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
     }
 
     @Override
-    public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
+    public void clearNamespaceBacklog(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "replicatorDispatchRate");
-            return request(path).get(DispatchRate.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            clearNamespaceBacklogAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void clearNamespaceBacklog(String namespace) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "clearBacklog");
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<Void> clearNamespaceBacklogAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "clearBacklog");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
     }
 
     @Override
     public void clearNamespaceBacklogForSubscription(String namespace, String subscription)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "clearBacklog", subscription);
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            clearNamespaceBacklogForSubscriptionAsync(namespace, subscription).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> clearNamespaceBacklogForSubscriptionAsync(String namespace, String subscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "clearBacklog", subscription);
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void clearNamespaceBundleBacklog(String namespace, String bundle) throws PulsarAdminException {
         try {
-            clearNamespaceBundleBacklogAsync(namespace, bundle).get();
+            clearNamespaceBundleBacklogAsync(namespace, bundle).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -698,12 +1447,15 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     public void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, String subscription)
             throws PulsarAdminException {
         try {
-            clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, subscription).get();
+            clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, subscription)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -718,24 +1470,37 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     @Override
     public void unsubscribeNamespace(String namespace, String subscription) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "unsubscribe", subscription);
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            unsubscribeNamespaceAsync(namespace, subscription).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> unsubscribeNamespaceAsync(String namespace, String subscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "unsubscribe", subscription);
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription)
             throws PulsarAdminException {
         try {
-            unsubscribeNamespaceBundleAsync(namespace, bundle, subscription).get();
+            unsubscribeNamespaceBundleAsync(namespace, bundle, subscription)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -750,236 +1515,581 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     @Override
     public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "subscriptionAuthMode");
-            request(path).post(Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setSubscriptionAuthModeAsync(namespace, subscriptionAuthMode).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setSubscriptionAuthModeAsync(String namespace, SubscriptionAuthMode subscriptionAuthMode) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "subscriptionAuthMode");
+        return asyncPostRequest(path, Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "encryptionRequired");
-            request(path).post(Entity.entity(encryptionRequired, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setEncryptionRequiredStatusAsync(namespace, encryptionRequired)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setEncryptionRequiredStatusAsync(String namespace, boolean encryptionRequired) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "encryptionRequired");
+        return asyncPostRequest(path, Entity.entity(encryptionRequired, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "delayedDelivery");
-            return request(path).get(DelayedDeliveryPolicies.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getDelayedDeliveryAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "delayedDelivery");
+        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<DelayedDeliveryPolicies>() {
+                    @Override
+                    public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+                        future.complete(delayedDeliveryPolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "delayedDelivery");
-            request(path).post(Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setDelayedDeliveryMessagesAsync(namespace, delayedDeliveryPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setDelayedDeliveryMessagesAsync(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "delayedDelivery");
+        return asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxProducersPerTopic");
-            return request(path).get(Integer.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getMaxProducersPerTopicAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Integer> getMaxProducersPerTopicAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer max) {
+                        future.complete(max);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxProducersPerTopic");
-            request(path).post(Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setMaxProducersPerTopicAsync(namespace, maxProducersPerTopic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+        return asyncPostRequest(path, Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
-            return request(path).get(Integer.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getMaxConsumersPerTopicAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Integer> getMaxConsumersPerTopicAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer max) {
+                        future.complete(max);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
-            request(path).post(Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setMaxConsumersPerTopicAsync(namespace, maxConsumersPerTopic)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, int maxConsumersPerTopic) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
+        return asyncPostRequest(path, Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+        try {
+            return getMaxConsumersPerSubscriptionAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+    public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer max) {
+                        future.complete(max);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
-            return request(path).get(Integer.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setMaxConsumersPerSubscriptionAsync(namespace, maxConsumersPerSubscription)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
-    public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
-        try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
-            request(path).post(Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+    public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+        return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
     }
 
     @Override
     public int getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
-            return request(path).get(Integer.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getMaxUnackedMessagesPerConsumerAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Integer> getMaxUnackedMessagesPerConsumerAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer max) {
+                        future.complete(max);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setMaxUnackedMessagesPerConsumer(String namespace, int maxUnackedMessagesPerConsumer) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
-            request(path).post(Entity.entity(maxUnackedMessagesPerConsumer, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setMaxUnackedMessagesPerConsumerAsync(namespace, maxUnackedMessagesPerConsumer).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setMaxUnackedMessagesPerConsumerAsync(String namespace, int maxUnackedMessagesPerConsumer) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
+        return asyncPostRequest(path, Entity.entity(maxUnackedMessagesPerConsumer, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getMaxUnackedMessagesPerSubscription(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
-            return request(path).get(Integer.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getMaxUnackedMessagesPerSubscriptionAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Integer> getMaxUnackedMessagesPerSubscriptionAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer max) {
+                        future.complete(max);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setMaxUnackedMessagesPerSubscription(String namespace, int maxUnackedMessagesPerSubscription) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
-            request(path).post(Entity.entity(maxUnackedMessagesPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setMaxUnackedMessagesPerSubscriptionAsync(namespace, maxUnackedMessagesPerSubscription)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setMaxUnackedMessagesPerSubscriptionAsync(String namespace, int maxUnackedMessagesPerSubscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
+        return asyncPostRequest(path, Entity.entity(maxUnackedMessagesPerSubscription, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public long getCompactionThreshold(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "compactionThreshold");
-            return request(path).get(Long.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getCompactionThresholdAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Long> getCompactionThresholdAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "compactionThreshold");
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Long>() {
+                    @Override
+                    public void completed(Long threshold) {
+                        future.complete(threshold);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "compactionThreshold");
-            request(path).put(Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setCompactionThresholdAsync(namespace, compactionThreshold)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setCompactionThresholdAsync(String namespace, long compactionThreshold) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "compactionThreshold");
+        return asyncPutRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public long getOffloadThreshold(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadThreshold");
-            return request(path).get(Long.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getOffloadThresholdAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Long> getOffloadThresholdAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadThreshold");
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Long>() {
+                    @Override
+                    public void completed(Long threshold) {
+                        future.complete(threshold);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadThreshold");
-            request(path).put(Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setOffloadThresholdAsync(namespace, offloadThreshold).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setOffloadThresholdAsync(String namespace, long offloadThreshold) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadThreshold");
+        return asyncPutRequest(path, Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
-            return request(path).get(Long.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getOffloadDeleteLagMsAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Long> getOffloadDeleteLagMsAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Long>() {
+                    @Override
+                    public void completed(Long lag) {
+                        future.complete(lag);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
-            request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON),
-                              ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setOffloadDeleteLagAsync(namespace, lag, unit).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setOffloadDeleteLagAsync(String namespace, long lag, TimeUnit unit) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+        return asyncPutRequest(path, Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            clearOffloadDeleteLagAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> clearOffloadDeleteLagAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
             throws PulsarAdminException {
         try {
@@ -1009,94 +2119,232 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     public boolean getSchemaValidationEnforced(String namespace)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "schemaValidationEnforced");
-            return request(path).get(Boolean.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getSchemaValidationEnforcedAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Boolean>() {
+                    @Override
+                    public void completed(Boolean enforced) {
+                        future.complete(enforced);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setSchemaValidationEnforced(String namespace, boolean schemaValidationEnforced)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "schemaValidationEnforced");
-            request(path).post(Entity.entity(schemaValidationEnforced, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setSchemaValidationEnforcedAsync(namespace, schemaValidationEnforced)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setSchemaValidationEnforcedAsync(String namespace, boolean schemaValidationEnforced) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+        return asyncPostRequest(path, Entity.entity(schemaValidationEnforced, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
-            return request(path).get(SchemaCompatibilityStrategy.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getSchemaCompatibilityStrategyAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
+        final CompletableFuture<SchemaCompatibilityStrategy> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<SchemaCompatibilityStrategy>() {
+                    @Override
+                    public void completed(SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
+                        future.complete(schemaCompatibilityStrategy);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setSchemaCompatibilityStrategy(String namespace, SchemaCompatibilityStrategy strategy) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
-            request(path).put(Entity.entity(strategy, MediaType.APPLICATION_JSON),
-                    ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setSchemaCompatibilityStrategyAsync(namespace, strategy).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
+    }
 
+    @Override
+    public CompletableFuture<Void> setSchemaCompatibilityStrategyAsync(String namespace, SchemaCompatibilityStrategy strategy) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
+        return asyncPutRequest(path, Entity.entity(strategy, MediaType.APPLICATION_JSON));
     }
 
     @Override
     public boolean getIsAllowAutoUpdateSchema(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
-            return request(path).get(Boolean.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getIsAllowAutoUpdateSchemaAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Boolean> getIsAllowAutoUpdateSchemaAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+        final CompletableFuture<Boolean> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Boolean>() {
+                    @Override
+                    public void completed(Boolean allowAutoUpdate) {
+                        future.complete(allowAutoUpdate);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
-            request(path).post(Entity.entity(isAllowAutoUpdateSchema, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setIsAllowAutoUpdateSchemaAsync(namespace, isAllowAutoUpdateSchema).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, boolean isAllowAutoUpdateSchema) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+        return asyncPostRequest(path, Entity.entity(isAllowAutoUpdateSchema, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadPolicies");
-            request(path).post(Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setOffloadPoliciesAsync(namespace, offloadPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadPolicies");
+        return asyncPostRequest(path, Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, "offloadPolicies");
-            return request(path).get(OffloadPolicies.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getOffloadPoliciesAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "offloadPolicies");
+        final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<OffloadPolicies>() {
+                    @Override
+                    public void completed(OffloadPolicies offloadPolicies) {
+                        future.complete(offloadPolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());