You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/20 17:10:10 UTC

[pulsar] branch master updated: [PulsarAdmin] Clusters to async (#6568)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fef14b  [PulsarAdmin] Clusters to async (#6568)
7fef14b is described below

commit 7fef14b6b2cc85424a15d253146e7afe3f423fbf
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 01:09:57 2020 +0800

    [PulsarAdmin] Clusters to async (#6568)
---
 .../org/apache/pulsar/client/admin/Clusters.java   | 318 +++++++++++++--
 .../pulsar/client/admin/internal/ClustersImpl.java | 454 ++++++++++++++++++---
 2 files changed, 676 insertions(+), 96 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 54c9c86..5d3b31d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -22,6 +22,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -55,6 +56,20 @@ public interface Clusters {
     List<String> getClusters() throws PulsarAdminException;
 
     /**
+     * Get the list of clusters asynchronously.
+     * <p>
+     * Get the list of all the Pulsar clusters.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["c1", "c2", "c3"]</code>
+     * </pre>
+     *
+     */
+    CompletableFuture<List<String>> getClustersAsync();
+
+    /**
      * Get the configuration data for the specified cluster.
      * <p>
      * Response Example:
@@ -78,6 +93,23 @@ public interface Clusters {
     ClusterData getCluster(String cluster) throws PulsarAdminException;
 
     /**
+     * Get the configuration data for the specified cluster asynchronously.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+     * </pre>
+     *
+     * @param cluster
+     *            Cluster name
+     *
+     * @return the cluster configuration
+     *
+     */
+    CompletableFuture<ClusterData> getClusterAsync(String cluster);
+
+    /**
      * Create a new cluster.
      * <p>
      * Provisions a new cluster. This operation requires Pulsar super-user privileges.
@@ -89,7 +121,7 @@ public interface Clusters {
      * @param clusterData
      *            the cluster configuration object
      *
-     * @throws NotAuthorized
+     * @throws NotAuthorizedException
      *             You don't have admin permission to create the cluster
      * @throws ConflictException
      *             Cluster already exists
@@ -99,6 +131,21 @@ public interface Clusters {
     void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException;
 
     /**
+     * Create a new cluster asynchronously.
+     * <p>
+     * Provisions a new cluster. This operation requires Pulsar super-user privileges.
+     * <p>
+     * The name cannot contain '/' characters.
+     *
+     * @param cluster
+     *            Cluster name
+     * @param clusterData
+     *            the cluster configuration object
+     *
+     */
+    CompletableFuture<Void> createClusterAsync(String cluster, ClusterData clusterData);
+
+    /**
      * Update the configuration for a cluster.
      * <p>
      * This operation requires Pulsar super-user privileges.
@@ -116,7 +163,20 @@ public interface Clusters {
      *             Unexpected error
      */
     void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException;
-    
+
+    /**
+     * Update the configuration for a cluster asynchronously.
+     * <p>
+     * This operation requires Pulsar super-user privileges.
+     *
+     * @param cluster
+     *            Cluster name
+     * @param clusterData
+     *            the cluster configuration object
+     *
+     */
+    CompletableFuture<Void> updateClusterAsync(String cluster, ClusterData clusterData);
+
     /**
      * Update peer cluster names.
      * <p>
@@ -135,9 +195,22 @@ public interface Clusters {
      *             Unexpected error
      */
     void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException;
-    
+
     /**
-     * Get peer-cluster names
+     * Update peer cluster names asynchronously.
+     * <p>
+     * This operation requires Pulsar super-user privileges.
+     *
+     * @param cluster
+     *            Cluster name
+     * @param peerClusterNames
+     *            list of peer cluster names
+     *
+     */
+    CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, LinkedHashSet<String> peerClusterNames);
+
+    /**
+     * Get peer-cluster names.
      * <p>
      *
      * @param cluster
@@ -156,10 +229,20 @@ public interface Clusters {
      *             Unexpected error
      */
     Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException;
-    
 
     /**
-     * Delete an existing cluster
+     * Get peer-cluster names asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     *
+     */
+    CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster);
+
+    /**
+     * Delete an existing cluster.
      * <p>
      * Delete a cluster
      *
@@ -178,6 +261,17 @@ public interface Clusters {
     void deleteCluster(String cluster) throws PulsarAdminException;
 
     /**
+     * Delete an existing cluster asynchronously.
+     * <p>
+     * Delete a cluster
+     *
+     * @param cluster
+     *            Cluster name
+     *
+     */
+    CompletableFuture<Void> deleteClusterAsync(String cluster);
+
+    /**
      * Get the namespace isolation policies of a cluster
      * <p>
      *
@@ -198,9 +292,29 @@ public interface Clusters {
      */
     Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster) throws PulsarAdminException;
 
+    /**
+     * Get the namespace isolation policies of a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Policies don't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(String cluster);
 
     /**
-     * Create a namespace isolation policy for a cluster
+     * Create a namespace isolation policy for a cluster.
      * <p>
      *
      * @param cluster
@@ -228,10 +342,27 @@ public interface Clusters {
     void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
             throws PulsarAdminException;
 
-    
+    /**
+     * Create a namespace isolation policy for a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param policyName
+     *          Policy name
+     *
+     * @param namespaceIsolationData
+     *          Namespace isolation policy configuration
+     *
+     * @return
+     */
+    CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
+            String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
+
     /**
      * Returns list of active brokers with namespace-isolation policies attached to it.
-     * 
+     *
      * @param cluster
      * @return
      * @throws PulsarAdminException
@@ -240,8 +371,16 @@ public interface Clusters {
             throws PulsarAdminException;
 
     /**
+     * Returns list of active brokers with namespace-isolation policies attached to it asynchronously.
+     *
+     * @param cluster
+     * @return
+     */
+    CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(String cluster);
+
+    /**
      * Returns active broker with namespace-isolation policies attached to it.
-     * 
+     *
      * @param cluster
      * @param broker
      * @return
@@ -249,7 +388,15 @@ public interface Clusters {
      */
     BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
             throws PulsarAdminException;
-    
+
+    /**
+     * Returns active broker with namespace-isolation policies attached to it asynchronously.
+     *
+     * @param cluster
+     * @param broker
+     * @return
+     */
+    CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(String cluster, String broker);
 
     /**
      * Update a namespace isolation policy for a cluster
@@ -280,6 +427,23 @@ public interface Clusters {
     void updateNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
             throws PulsarAdminException;
 
+    /**
+     * Update a namespace isolation policy for a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param policyName
+     *          Policy name
+     *
+     * @param namespaceIsolationData
+     *          Namespace isolation policy configuration
+     *
+     * @return
+     *
+     */
+    CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
 
     /**
      * Delete a namespace isolation policy for a cluster
@@ -308,7 +472,22 @@ public interface Clusters {
     void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
 
     /**
-     * Get a single namespace isolation policy for a cluster
+     * Delete a namespace isolation policy for a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param policyName
+     *          Policy name
+     *
+     * @return
+     */
+
+    CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluster, String policyName);
+
+    /**
+     * Get a single namespace isolation policy for a cluster.
      * <p>
      *
      * @param cluster
@@ -332,7 +511,20 @@ public interface Clusters {
     NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
 
     /**
-     * Create a domain into cluster
+     * Get a single namespace isolation policy for a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param policyName
+     *          Policy name
+     *
+     */
+    CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(String cluster, String policyName);
+
+    /**
+     * Create a domain into cluster.
      * <p>
      *
      * @param cluster
@@ -341,7 +533,7 @@ public interface Clusters {
      * @param domainName
      *          domain name
      *
-     * @param FailureDomain
+     * @param domain
      *          Domain configurations
      *
      * @return
@@ -350,7 +542,7 @@ public interface Clusters {
      *
      * @throws ConflictException
      *             Broker already exist into other domain
-     *             
+     *
      * @throws NotFoundException
      *             Cluster doesn't exist
      *
@@ -362,10 +554,9 @@ public interface Clusters {
      */
     void createFailureDomain(String cluster, String domainName, FailureDomain domain)
             throws PulsarAdminException;
-    
-    
+
     /**
-     * Update a domain into cluster
+     * Create a domain into cluster asynchronously.
      * <p>
      *
      * @param cluster
@@ -374,7 +565,25 @@ public interface Clusters {
      * @param domainName
      *          domain name
      *
-     * @param FailureDomain
+     * @param domain
+     *          Domain configurations
+     *
+     * @return
+     *
+     */
+    CompletableFuture<Void> createFailureDomainAsync(String cluster, String domainName, FailureDomain domain);
+
+    /**
+     * Update a domain into cluster.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param domain
      *          Domain configurations
      *
      * @return
@@ -383,7 +592,7 @@ public interface Clusters {
      *
      * @throws ConflictException
      *             Broker already exist into other domain
-     *             
+     *
      * @throws NotFoundException
      *             Cluster doesn't exist
      *
@@ -395,10 +604,27 @@ public interface Clusters {
      */
     void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
             throws PulsarAdminException;
-    
-    
+
+    /**
+     * Update a domain into cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param domain
+     *          Domain configurations
+     *
+     * @return
+     *
+     */
+    CompletableFuture<Void> updateFailureDomainAsync(String cluster, String domainName, FailureDomain domain);
+
     /**
-     * Delete a domain in cluster
+     * Delete a domain in cluster.
      * <p>
      *
      * @param cluster
@@ -420,11 +646,25 @@ public interface Clusters {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-
     void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException;
 
     /**
-     * Get all registered domains in cluster
+     * Delete a domain in cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          Domain name
+     *
+     * @return
+     *
+     */
+    CompletableFuture<Void> deleteFailureDomainAsync(String cluster, String domainName);
+
+    /**
+     * Get all registered domains in cluster.
      * <p>
      *
      * @param cluster
@@ -440,9 +680,20 @@ public interface Clusters {
      *             Unexpected error
      */
     Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException;
-    
+
     /**
-     * Get the domain registered into a cluster
+     * Get all registered domains in cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     *
+     */
+    CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster);
+
+    /**
+     * Get the domain registered into a cluster.
      * <p>
      *
      * @param cluster
@@ -461,5 +712,16 @@ public interface Clusters {
      *             Unexpected error
      */
     FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException;
-    
+
+    /**
+     * Get the domain registered into a cluster asynchronously.
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     *
+     */
+    CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName);
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index fcc598e..8be02ca 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -22,10 +22,14 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.pulsar.client.admin.Clusters;
@@ -34,7 +38,6 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
 public class ClustersImpl extends BaseResource implements Clusters {
@@ -49,189 +52,504 @@ public class ClustersImpl extends BaseResource implements Clusters {
     @Override
     public List<String> getClusters() throws PulsarAdminException {
         try {
-            return request(adminClusters).get(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getClustersAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getClustersAsync() {
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(adminClusters,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> clusters) {
+                        future.complete(clusters);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public ClusterData getCluster(String cluster) throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster)).get(ClusterData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getClusterAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<ClusterData> getClusterAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster);
+        final CompletableFuture<ClusterData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<ClusterData>() {
+                    @Override
+                    public void completed(ClusterData clusterData) {
+                        future.complete(clusterData);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster))
-                    .put(Entity.entity(clusterData, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            createClusterAsync(cluster, clusterData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> createClusterAsync(String cluster, ClusterData clusterData) {
+        WebTarget path = adminClusters.path(cluster);
+        return asyncPutRequest(path, Entity.entity(clusterData, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster)).post(Entity.entity(clusterData, MediaType.APPLICATION_JSON),
-                    ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            updateClusterAsync(cluster, clusterData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> updateClusterAsync(String cluster, ClusterData clusterData) {
+        WebTarget path = adminClusters.path(cluster);
+        return asyncPostRequest(path, Entity.entity(clusterData, MediaType.APPLICATION_JSON_TYPE));
+    }
+
+    @Override
     public void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster).path("peers")).post(Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON),
-                    ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            updatePeerClusterNamesAsync(cluster, peerClusterNames).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
+    }
 
+    @Override
+    public CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, LinkedHashSet<String> peerClusterNames) {
+        WebTarget path = adminClusters.path(cluster).path("peers");
+        return asyncPostRequest(path, Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON));
     }
 
-	@Override
+    @Override
     @SuppressWarnings("unchecked")
 	public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException {
-		try {
-			return request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
-		} catch (Exception e) {
-			throw getApiException(e);
-		}
+        try {
+            return getPeerClusterNamesAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
 	}
 
     @Override
+    public CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster).path("peers");
+        final CompletableFuture<Set<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Set<String>>() {
+                    @Override
+                    public void completed(Set<String> clusterNames) {
+                        future.complete(clusterNames);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void deleteCluster(String cluster) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster)).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteClusterAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteClusterAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster);
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster) throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("namespaceIsolationPolicies")).get(
-                    new GenericType<Map<String, NamespaceIsolationData>>() {
-                    });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceIsolationPoliciesAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies");
+        final CompletableFuture<Map<String, NamespaceIsolationData>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<String, NamespaceIsolationData>>() {
+                    @Override
+                    public void completed(Map<String, NamespaceIsolationData> stringNamespaceIsolationDataMap) {
+                        future.complete(stringNamespaceIsolationDataMap);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
 
     @Override
     public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(String cluster)
             throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers"))
-                    .get(new GenericType<List<BrokerNamespaceIsolationData>>() {
-                    });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getBrokersWithNamespaceIsolationPolicyAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers");
+        final CompletableFuture<List<BrokerNamespaceIsolationData>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<List<BrokerNamespaceIsolationData>>() {
+                    @Override
+                    public void completed(List<BrokerNamespaceIsolationData> brokerNamespaceIsolationData) {
+                        future.complete(brokerNamespaceIsolationData);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
             throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker))
-                    .get(BrokerNamespaceIsolationData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getBrokerWithNamespaceIsolationPolicyAsync(cluster, broker).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(String cluster, String broker) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker);
+        final CompletableFuture<BrokerNamespaceIsolationData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<BrokerNamespaceIsolationData>() {
+                    @Override
+                    public void completed(BrokerNamespaceIsolationData brokerNamespaceIsolationData) {
+                        future.complete(brokerNamespaceIsolationData);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void createNamespaceIsolationPolicy(String cluster, String policyName,
             NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
         setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
     }
 
     @Override
+    public CompletableFuture<Void> createNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
+        return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
+    }
+
+    @Override
     public void updateNamespaceIsolationPolicy(String cluster, String policyName,
             NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
         setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
     }
 
     @Override
+    public CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
+        return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
+    }
+
+    @Override
     public void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException {
-        request(adminClusters.path(cluster)
-                .path("namespaceIsolationPolicies").path(policyName)).delete(ErrorData.class);
+        try {
+            deleteNamespaceIsolationPolicyAsync(cluster, policyName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluster, String policyName) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+        return asyncDeleteRequest(path);
     }
 
     private void setNamespaceIsolationPolicy(String cluster, String policyName,
             NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).post(
-                    Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    private CompletableFuture<Void> setNamespaceIsolationPolicyAsync(String cluster, String policyName,
+                                             NamespaceIsolationData namespaceIsolationData) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+        return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName)
             throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).get(
-                    NamespaceIsolationData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceIsolationPolicyAsync(cluster, policyName)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(String cluster, String policyName) {
+        WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+        final CompletableFuture<NamespaceIsolationData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<NamespaceIsolationData>() {
+                    @Override
+                    public void completed(NamespaceIsolationData namespaceIsolationData) {
+                        future.complete(namespaceIsolationData);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
         setDomain(cluster, domainName, domain);
     }
 
     @Override
+    public CompletableFuture<Void> createFailureDomainAsync(String cluster, String domainName, FailureDomain domain) {
+        return setDomainAsync(cluster, domainName, domain);
+    }
+
+    @Override
     public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
         setDomain(cluster, domainName, domain);
     }
 
     @Override
+    public CompletableFuture<Void> updateFailureDomainAsync(String cluster, String domainName, FailureDomain domain) {
+        return setDomainAsync(cluster, domainName, domain);
+    }
+
+    @Override
     public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
-        request(adminClusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
+        try {
+            deleteFailureDomainAsync(cluster, domainName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteFailureDomainAsync(String cluster, String domainName) {
+        WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+        return asyncDeleteRequest(path);
     }
 
     @Override
     public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("failureDomains"))
-                    .get(new GenericType<Map<String, FailureDomain>>() {
-                    });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFailureDomainsAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster) {
+        WebTarget path = adminClusters.path(cluster).path("failureDomains");
+        final CompletableFuture<Map<String, FailureDomain>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Map<String, FailureDomain>>() {
+                    @Override
+                    public void completed(Map<String, FailureDomain> failureDomains) {
+                        future.complete(failureDomains);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
         try {
-            return request(adminClusters.path(cluster).path("failureDomains")
-                    .path(domainName)).get(FailureDomain.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFailureDomainAsync(cluster, domainName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName) {
+        WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+        final CompletableFuture<FailureDomain> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<FailureDomain>() {
+                    @Override
+                    public void completed(FailureDomain failureDomain) {
+                        future.complete(failureDomain);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     private void setDomain(String cluster, String domainName,
             FailureDomain domain) throws PulsarAdminException {
         try {
-            request(adminClusters.path(cluster).path("failureDomains").path(domainName)).post(
-                    Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setDomainAsync(cluster, domainName, domain).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
+
+    private CompletableFuture<Void> setDomainAsync(String cluster, String domainName,
+                           FailureDomain domain) {
+        WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+        return asyncPostRequest(path, Entity.entity(domain, MediaType.APPLICATION_JSON));
+    }
 }