You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/20 17:10:10 UTC
[pulsar] branch master updated: [PulsarAdmin] Clusters to async
(#6568)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7fef14b [PulsarAdmin] Clusters to async (#6568)
7fef14b is described below
commit 7fef14b6b2cc85424a15d253146e7afe3f423fbf
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 01:09:57 2020 +0800
[PulsarAdmin] Clusters to async (#6568)
---
.../org/apache/pulsar/client/admin/Clusters.java | 318 +++++++++++++--
.../pulsar/client/admin/internal/ClustersImpl.java | 454 ++++++++++++++++++---
2 files changed, 676 insertions(+), 96 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 54c9c86..5d3b31d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -22,6 +22,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -55,6 +56,20 @@ public interface Clusters {
List<String> getClusters() throws PulsarAdminException;
/**
+ * Get the list of clusters asynchronously.
+ * <p>
+ * Get the list of all the Pulsar clusters.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["c1", "c2", "c3"]</code>
+ * </pre>
+ *
+ */
+ CompletableFuture<List<String>> getClustersAsync();
+
+ /**
* Get the configuration data for the specified cluster.
* <p>
* Response Example:
@@ -78,6 +93,23 @@ public interface Clusters {
ClusterData getCluster(String cluster) throws PulsarAdminException;
/**
+ * Get the configuration data for the specified cluster asynchronously.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @return the cluster configuration
+ *
+ */
+ CompletableFuture<ClusterData> getClusterAsync(String cluster);
+
+ /**
* Create a new cluster.
* <p>
* Provisions a new cluster. This operation requires Pulsar super-user privileges.
@@ -89,7 +121,7 @@ public interface Clusters {
* @param clusterData
* the cluster configuration object
*
- * @throws NotAuthorized
+ * @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws ConflictException
* Cluster already exists
@@ -99,6 +131,21 @@ public interface Clusters {
void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException;
/**
+ * Create a new cluster asynchronously.
+ * <p>
+ * Provisions a new cluster. This operation requires Pulsar super-user privileges.
+ * <p>
+ * The name cannot contain '/' characters.
+ *
+ * @param cluster
+ * Cluster name
+ * @param clusterData
+ * the cluster configuration object
+ *
+ */
+ CompletableFuture<Void> createClusterAsync(String cluster, ClusterData clusterData);
+
+ /**
* Update the configuration for a cluster.
* <p>
* This operation requires Pulsar super-user privileges.
@@ -116,7 +163,20 @@ public interface Clusters {
* Unexpected error
*/
void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException;
-
+
+ /**
+ * Update the configuration for a cluster asynchronously.
+ * <p>
+ * This operation requires Pulsar super-user privileges.
+ *
+ * @param cluster
+ * Cluster name
+ * @param clusterData
+ * the cluster configuration object
+ *
+ */
+ CompletableFuture<Void> updateClusterAsync(String cluster, ClusterData clusterData);
+
/**
* Update peer cluster names.
* <p>
@@ -135,9 +195,22 @@ public interface Clusters {
* Unexpected error
*/
void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException;
-
+
/**
- * Get peer-cluster names
+ * Update peer cluster names asynchronously.
+ * <p>
+ * This operation requires Pulsar super-user privileges.
+ *
+ * @param cluster
+ * Cluster name
+ * @param peerClusterNames
+ * list of peer cluster names
+ *
+ */
+ CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, LinkedHashSet<String> peerClusterNames);
+
+ /**
+ * Get peer-cluster names.
* <p>
*
* @param cluster
@@ -156,10 +229,20 @@ public interface Clusters {
* Unexpected error
*/
Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException;
-
/**
- * Delete an existing cluster
+ * Get peer-cluster names asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ *
+ */
+ CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster);
+
+ /**
+ * Delete an existing cluster.
* <p>
* Delete a cluster
*
@@ -178,6 +261,17 @@ public interface Clusters {
void deleteCluster(String cluster) throws PulsarAdminException;
/**
+ * Delete an existing cluster asynchronously.
+ * <p>
+ * Delete a cluster
+ *
+ * @param cluster
+ * Cluster name
+ *
+ */
+ CompletableFuture<Void> deleteClusterAsync(String cluster);
+
+ /**
* Get the namespace isolation policies of a cluster
* <p>
*
@@ -198,9 +292,29 @@ public interface Clusters {
*/
Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster) throws PulsarAdminException;
+ /**
+ * Get the namespace isolation policies of a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws NotFoundException
+ * Policies don't exist
+ *
+ * @throws PreconditionFailedException
+ * Cluster doesn't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(String cluster);
/**
- * Create a namespace isolation policy for a cluster
+ * Create a namespace isolation policy for a cluster.
* <p>
*
* @param cluster
@@ -228,10 +342,27 @@ public interface Clusters {
void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException;
-
+ /**
+ * Create a namespace isolation policy for a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param policyName
+ * Policy name
+ *
+ * @param namespaceIsolationData
+ * Namespace isolation policy configuration
+ *
+ * @return
+ */
+ CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
+ String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
+
/**
* Returns list of active brokers with namespace-isolation policies attached to it.
- *
+ *
* @param cluster
* @return
* @throws PulsarAdminException
@@ -240,8 +371,16 @@ public interface Clusters {
throws PulsarAdminException;
/**
+ * Returns list of active brokers with namespace-isolation policies attached to it asynchronously.
+ *
+ * @param cluster
+ * @return
+ */
+ CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(String cluster);
+
+ /**
* Returns active broker with namespace-isolation policies attached to it.
- *
+ *
* @param cluster
* @param broker
* @return
@@ -249,7 +388,15 @@ public interface Clusters {
*/
BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
throws PulsarAdminException;
-
+
+ /**
+ * Returns active broker with namespace-isolation policies attached to it asynchronously.
+ *
+ * @param cluster
+ * @param broker
+ * @return
+ */
+ CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(String cluster, String broker);
/**
* Update a namespace isolation policy for a cluster
@@ -280,6 +427,23 @@ public interface Clusters {
void updateNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException;
+ /**
+ * Update a namespace isolation policy for a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param policyName
+ * Policy name
+ *
+ * @param namespaceIsolationData
+ * Namespace isolation policy configuration
+ *
+ * @return
+ *
+ */
+ CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
/**
* Delete a namespace isolation policy for a cluster
@@ -308,7 +472,22 @@ public interface Clusters {
void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
/**
- * Get a single namespace isolation policy for a cluster
+ * Delete a namespace isolation policy for a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param policyName
+ * Policy name
+ *
+ * @return
+ */
+
+ CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluster, String policyName);
+
+ /**
+ * Get a single namespace isolation policy for a cluster.
* <p>
*
* @param cluster
@@ -332,7 +511,20 @@ public interface Clusters {
NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
/**
- * Create a domain into cluster
+ * Get a single namespace isolation policy for a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param policyName
+ * Policy name
+ *
+ */
+ CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(String cluster, String policyName);
+
+ /**
+ * Create a domain into cluster.
* <p>
*
* @param cluster
@@ -341,7 +533,7 @@ public interface Clusters {
* @param domainName
* domain name
*
- * @param FailureDomain
+ * @param domain
* Domain configurations
*
* @return
@@ -350,7 +542,7 @@ public interface Clusters {
*
* @throws ConflictException
* Broker already exist into other domain
- *
+ *
* @throws NotFoundException
* Cluster doesn't exist
*
@@ -362,10 +554,9 @@ public interface Clusters {
*/
void createFailureDomain(String cluster, String domainName, FailureDomain domain)
throws PulsarAdminException;
-
-
+
/**
- * Update a domain into cluster
+ * Create a domain into cluster asynchronously.
* <p>
*
* @param cluster
@@ -374,7 +565,25 @@ public interface Clusters {
* @param domainName
* domain name
*
- * @param FailureDomain
+ * @param domain
+ * Domain configurations
+ *
+ * @return
+ *
+ */
+ CompletableFuture<Void> createFailureDomainAsync(String cluster, String domainName, FailureDomain domain);
+
+ /**
+ * Update a domain into cluster.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * domain name
+ *
+ * @param domain
* Domain configurations
*
* @return
@@ -383,7 +592,7 @@ public interface Clusters {
*
* @throws ConflictException
* Broker already exist into other domain
- *
+ *
* @throws NotFoundException
* Cluster doesn't exist
*
@@ -395,10 +604,27 @@ public interface Clusters {
*/
void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
throws PulsarAdminException;
-
-
+
+ /**
+ * Update a domain into cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * domain name
+ *
+ * @param domain
+ * Domain configurations
+ *
+ * @return
+ *
+ */
+ CompletableFuture<Void> updateFailureDomainAsync(String cluster, String domainName, FailureDomain domain);
+
/**
- * Delete a domain in cluster
+ * Delete a domain in cluster.
* <p>
*
* @param cluster
@@ -420,11 +646,25 @@ public interface Clusters {
* @throws PulsarAdminException
* Unexpected error
*/
-
void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException;
/**
- * Get all registered domains in cluster
+ * Delete a domain in cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * Domain name
+ *
+ * @return
+ *
+ */
+ CompletableFuture<Void> deleteFailureDomainAsync(String cluster, String domainName);
+
+ /**
+ * Get all registered domains in cluster.
* <p>
*
* @param cluster
@@ -440,9 +680,20 @@ public interface Clusters {
* Unexpected error
*/
Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException;
-
+
/**
- * Get the domain registered into a cluster
+ * Get all registered domains in cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ *
+ */
+ CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster);
+
+ /**
+ * Get the domain registered into a cluster.
* <p>
*
* @param cluster
@@ -461,5 +712,16 @@ public interface Clusters {
* Unexpected error
*/
FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException;
-
+
+ /**
+ * Get the domain registered into a cluster asynchronously.
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ *
+ */
+ CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName);
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index fcc598e..8be02ca 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -22,10 +22,14 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.Clusters;
@@ -34,7 +38,6 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
public class ClustersImpl extends BaseResource implements Clusters {
@@ -49,189 +52,504 @@ public class ClustersImpl extends BaseResource implements Clusters {
@Override
public List<String> getClusters() throws PulsarAdminException {
try {
- return request(adminClusters).get(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getClustersAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getClustersAsync() {
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(adminClusters,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> clusters) {
+ future.complete(clusters);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public ClusterData getCluster(String cluster) throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster)).get(ClusterData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getClusterAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<ClusterData> getClusterAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster);
+ final CompletableFuture<ClusterData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<ClusterData>() {
+ @Override
+ public void completed(ClusterData clusterData) {
+ future.complete(clusterData);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster))
- .put(Entity.entity(clusterData, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ createClusterAsync(cluster, clusterData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> createClusterAsync(String cluster, ClusterData clusterData) {
+ WebTarget path = adminClusters.path(cluster);
+ return asyncPutRequest(path, Entity.entity(clusterData, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster)).post(Entity.entity(clusterData, MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ updateClusterAsync(cluster, clusterData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> updateClusterAsync(String cluster, ClusterData clusterData) {
+ WebTarget path = adminClusters.path(cluster);
+ return asyncPostRequest(path, Entity.entity(clusterData, MediaType.APPLICATION_JSON_TYPE));
+ }
+
+ @Override
public void updatePeerClusterNames(String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster).path("peers")).post(Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ updatePeerClusterNamesAsync(cluster, peerClusterNames).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
+ }
+ @Override
+ public CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, LinkedHashSet<String> peerClusterNames) {
+ WebTarget path = adminClusters.path(cluster).path("peers");
+ return asyncPostRequest(path, Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON));
}
- @Override
+ @Override
@SuppressWarnings("unchecked")
public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException {
- try {
- return request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ try {
+ return getPeerClusterNamesAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
}
@Override
+ public CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster).path("peers");
+ final CompletableFuture<Set<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Set<String>>() {
+ @Override
+ public void completed(Set<String> clusterNames) {
+ future.complete(clusterNames);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void deleteCluster(String cluster) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster)).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteClusterAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteClusterAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster);
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster) throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("namespaceIsolationPolicies")).get(
- new GenericType<Map<String, NamespaceIsolationData>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceIsolationPoliciesAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies");
+ final CompletableFuture<Map<String, NamespaceIsolationData>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, NamespaceIsolationData>>() {
+ @Override
+ public void completed(Map<String, NamespaceIsolationData> stringNamespaceIsolationDataMap) {
+ future.complete(stringNamespaceIsolationDataMap);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
@Override
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(String cluster)
throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers"))
- .get(new GenericType<List<BrokerNamespaceIsolationData>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getBrokersWithNamespaceIsolationPolicyAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers");
+ final CompletableFuture<List<BrokerNamespaceIsolationData>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<BrokerNamespaceIsolationData>>() {
+ @Override
+ public void completed(List<BrokerNamespaceIsolationData> brokerNamespaceIsolationData) {
+ future.complete(brokerNamespaceIsolationData);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker))
- .get(BrokerNamespaceIsolationData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getBrokerWithNamespaceIsolationPolicyAsync(cluster, broker).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(String cluster, String broker) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker);
+ final CompletableFuture<BrokerNamespaceIsolationData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<BrokerNamespaceIsolationData>() {
+ @Override
+ public void completed(BrokerNamespaceIsolationData brokerNamespaceIsolationData) {
+ future.complete(brokerNamespaceIsolationData);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void createNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
}
@Override
+ public CompletableFuture<Void> createNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
+ return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
+ }
+
+ @Override
public void updateNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
}
@Override
+ public CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
+ return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
+ }
+
+ @Override
public void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException {
- request(adminClusters.path(cluster)
- .path("namespaceIsolationPolicies").path(policyName)).delete(ErrorData.class);
+ try {
+ deleteNamespaceIsolationPolicyAsync(cluster, policyName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluster, String policyName) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+ return asyncDeleteRequest(path);
}
private void setNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).post(
- Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ private CompletableFuture<Void> setNamespaceIsolationPolicyAsync(String cluster, String policyName,
+ NamespaceIsolationData namespaceIsolationData) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+ return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON));
+ }
+
@Override
public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName)
throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)).get(
- NamespaceIsolationData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceIsolationPolicyAsync(cluster, policyName)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(String cluster, String policyName) {
+ WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
+ final CompletableFuture<NamespaceIsolationData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<NamespaceIsolationData>() {
+ @Override
+ public void completed(NamespaceIsolationData namespaceIsolationData) {
+ future.complete(namespaceIsolationData);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
setDomain(cluster, domainName, domain);
}
@Override
+ public CompletableFuture<Void> createFailureDomainAsync(String cluster, String domainName, FailureDomain domain) {
+ return setDomainAsync(cluster, domainName, domain);
+ }
+
+ @Override
public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
setDomain(cluster, domainName, domain);
}
@Override
+ public CompletableFuture<Void> updateFailureDomainAsync(String cluster, String domainName, FailureDomain domain) {
+ return setDomainAsync(cluster, domainName, domain);
+ }
+
+ @Override
public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
- request(adminClusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
+ try {
+ deleteFailureDomainAsync(cluster, domainName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteFailureDomainAsync(String cluster, String domainName) {
+ WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+ return asyncDeleteRequest(path);
}
@Override
public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("failureDomains"))
- .get(new GenericType<Map<String, FailureDomain>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getFailureDomainsAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster).path("failureDomains");
+ final CompletableFuture<Map<String, FailureDomain>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, FailureDomain>>() {
+ @Override
+ public void completed(Map<String, FailureDomain> failureDomains) {
+ future.complete(failureDomains);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
try {
- return request(adminClusters.path(cluster).path("failureDomains")
- .path(domainName)).get(FailureDomain.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getFailureDomainAsync(cluster, domainName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName) {
+ WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+ final CompletableFuture<FailureDomain> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<FailureDomain>() {
+ @Override
+ public void completed(FailureDomain failureDomain) {
+ future.complete(failureDomain);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
try {
- request(adminClusters.path(cluster).path("failureDomains").path(domainName)).post(
- Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setDomainAsync(cluster, domainName, domain).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+
+ private CompletableFuture<Void> setDomainAsync(String cluster, String domainName,
+ FailureDomain domain) {
+ WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
+ return asyncPostRequest(path, Entity.entity(domain, MediaType.APPLICATION_JSON));
+ }
}