You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/23 05:14:11 UTC
[pulsar] branch master updated: [PulsarAdmin] Namespaces to async
(#6590)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a7e210f [PulsarAdmin] Namespaces to async (#6590)
a7e210f is described below
commit a7e210f15bf0b27546c8b22c04c1b05cda8f3ee8
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Mar 23 13:14:00 2020 +0800
[PulsarAdmin] Namespaces to async (#6590)
---
.../org/apache/pulsar/client/admin/Namespaces.java | 1200 ++++++++++-
.../client/admin/internal/NamespacesImpl.java | 2168 +++++++++++++++-----
2 files changed, 2853 insertions(+), 515 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bfd4b7b..15b2ace 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -74,6 +74,24 @@ public interface Namespaces {
List<String> getNamespaces(String tenant) throws PulsarAdminException;
/**
+ * Get the list of namespaces asynchronously.
+ * <p>
+ * Get the list of all the namespaces for a certain tenant.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["my-tenant/c1/namespace1",
+ * "my-tenant/global/namespace2",
+ * "my-tenant/c2/namespace3"]</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ */
+ CompletableFuture<List<String>> getNamespacesAsync(String tenant);
+
+ /**
* Get the list of namespaces.
* <p>
* Get the list of all the namespaces for a certain tenant on single cluster.
@@ -124,6 +142,23 @@ public interface Namespaces {
List<String> getTopics(String namespace) throws PulsarAdminException;
/**
+ * Get the list of topics asynchronously.
+ * <p>
+ * Get the list of all the topics under a certain namespace.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["persistent://my-tenant/use/namespace1/my-topic-1",
+ * "persistent://my-tenant/use/namespace1/my-topic-2"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<List<String>> getTopicsAsync(String namespace);
+
+ /**
* Get policies for a namespace.
* <p>
* Get the dump all the policies specified for a namespace.
@@ -163,6 +198,38 @@ public interface Namespaces {
Policies getPolicies(String namespace) throws PulsarAdminException;
/**
+ * Get policies for a namespace asynchronously.
+ * <p>
+ * Get the dump all the policies specified for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>{
+ * "auth_policies" : {
+ * "namespace_auth" : {
+ * "my-role" : [ "produce" ]
+ * },
+ * "destination_auth" : {
+ * "persistent://prop/local/ns1/my-topic" : {
+ * "role-1" : [ "produce" ],
+ * "role-2" : [ "consume" ]
+ * }
+ * }
+ * },
+ * "replication_clusters" : ["use", "usw"],
+ * "message_ttl_in_seconds" : 300
+ * }</code>
+ * </pre>
+ *
+ * @see Policies
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Policies> getPoliciesAsync(String namespace);
+
+ /**
* Create a new namespace.
* <p>
* Creates a new empty namespace with no policies attached.
@@ -190,6 +257,18 @@ public interface Namespaces {
*
* @param namespace
* Namespace name
+ * @param numBundles
+ * Number of bundles
+ */
+ CompletableFuture<Void> createNamespaceAsync(String namespace, int numBundles);
+
+ /**
+ * Create a new namespace.
+ * <p>
+ * Creates a new empty namespace with no policies attached.
+ *
+ * @param namespace
+ * Namespace name
* @param bundlesData
* Bundles Data
*
@@ -205,6 +284,18 @@ public interface Namespaces {
void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException;
/**
+ * Create a new namespace asynchronously.
+ * <p>
+ * Creates a new empty namespace with no policies attached.
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundlesData
+ * Bundles Data
+ */
+ CompletableFuture<Void> createNamespaceAsync(String namespace, BundlesData bundlesData);
+
+ /**
* Create a new namespace.
* <p>
* Creates a new empty namespace with no policies attached.
@@ -224,6 +315,16 @@ public interface Namespaces {
void createNamespace(String namespace) throws PulsarAdminException;
/**
+ * Create a new namespace asynchronously.
+ * <p>
+ * Creates a new empty namespace with no policies attached.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> createNamespaceAsync(String namespace);
+
+ /**
* Create a new namespace.
* <p>
* Creates a new empty namespace with no policies attached.
@@ -246,6 +347,19 @@ public interface Namespaces {
void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException;
/**
+ * Create a new namespace asynchronously.
+ * <p>
+ * Creates a new empty namespace with no policies attached.
+ *
+ * @param namespace
+ * Namespace name
+ * @param clusters
+ * Clusters in which the namespace will be present. If more than one cluster is present, replication
+ * across clusters will be enabled.
+ */
+ CompletableFuture<Void> createNamespaceAsync(String namespace, Set<String> clusters);
+
+ /**
* Create a new namespace.
* <p>
* Creates a new namespace with the specified policies.
@@ -269,6 +383,18 @@ public interface Namespaces {
void createNamespace(String namespace, Policies policies) throws PulsarAdminException;
/**
+ * Create a new namespace asynchronously.
+ * <p>
+ * Creates a new namespace with the specified policies.
+ *
+ * @param namespace
+ * Namespace name
+ * @param policies
+ * Policies for the namespace
+ */
+ CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies);
+
+ /**
* Delete an existing namespace.
* <p>
* The namespace needs to be empty.
@@ -288,6 +414,16 @@ public interface Namespaces {
void deleteNamespace(String namespace) throws PulsarAdminException;
/**
+ * Delete an existing namespace asynchronously.
+ * <p>
+ * The namespace needs to be empty.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> deleteNamespaceAsync(String namespace);
+
+ /**
* Delete an existing bundle in a namespace.
* <p>
* The bundle needs to be empty.
@@ -349,6 +485,25 @@ public interface Namespaces {
Map<String, Set<AuthAction>> getPermissions(String namespace) throws PulsarAdminException;
/**
+ * Get permissions on a namespace asynchronously.
+ * <p>
+ * Retrieve the permissions for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>{
+ * "my-role" : [ "produce" ]
+ * "other-role" : [ "consume" ]
+ * }</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String namespace);
+
+ /**
* Grant permission on a namespace.
* <p>
* Grant a new permission to a client role on a namespace.
@@ -378,6 +533,26 @@ public interface Namespaces {
void grantPermissionOnNamespace(String namespace, String role, Set<AuthAction> actions) throws PulsarAdminException;
/**
+ * Grant permission on a namespace asynchronously.
+ * <p>
+ * Grant a new permission to a client role on a namespace.
+ * <p>
+ * Request parameter example:
+ *
+ * <pre>
+ * <code>["produce", "consume"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param role
+ * Client role to which grant permission
+ * @param actions
+ * Auth actions (produce and consume)
+ */
+ CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace, String role, Set<AuthAction> actions);
+
+ /**
* Revoke permissions on a namespace.
* <p>
* Revoke all permissions to a client role on a namespace.
@@ -397,6 +572,18 @@ public interface Namespaces {
void revokePermissionsOnNamespace(String namespace, String role) throws PulsarAdminException;
/**
+ * Revoke permissions on a namespace asynchronously.
+ * <p>
+ * Revoke all permissions to a client role on a namespace.
+ *
+ * @param namespace
+ * Namespace name
+ * @param role
+ * Client role to which remove permissions
+ */
+ CompletableFuture<Void> revokePermissionsOnNamespaceAsync(String namespace, String role);
+
+ /**
* Grant permission to role to access subscription's admin-api.
* @param namespace
* @param subscription
@@ -406,6 +593,14 @@ public interface Namespaces {
void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles) throws PulsarAdminException;
/**
+ * Grant permission to role to access subscription's admin-api asynchronously.
+ * @param namespace
+ * @param subscription
+ * @param roles
+ */
+ CompletableFuture<Void> grantPermissionOnSubscriptionAsync(String namespace, String subscription, Set<String> roles);
+
+ /**
* Revoke permissions on a subscription's admin-api access.
* @param namespace
* @param subscription
@@ -415,6 +610,14 @@ public interface Namespaces {
void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException;
/**
+ * Revoke permissions on a subscription's admin-api access asynchronously.
+ * @param namespace
+ * @param subscription
+ * @param role
+ */
+ CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role);
+
+ /**
* Get the replication clusters for a namespace.
* <p>
* Response example:
@@ -438,6 +641,20 @@ public interface Namespaces {
List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException;
/**
+ * Get the replication clusters for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>["use", "usw", "usc"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<List<String>> getNamespaceReplicationClustersAsync(String namespace);
+
+ /**
* Set the replication clusters for a namespace.
* <p>
* Request example:
@@ -465,6 +682,22 @@ public interface Namespaces {
void setNamespaceReplicationClusters(String namespace, Set<String> clusterIds) throws PulsarAdminException;
/**
+ * Set the replication clusters for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>["us-west", "us-east", "us-cent"]</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param clusterIds
+ * Pulsar Cluster Ids
+ */
+ CompletableFuture<Void> setNamespaceReplicationClustersAsync(String namespace, Set<String> clusterIds);
+
+ /**
* Get the message TTL for a namespace.
* <p>
* Response example:
@@ -486,6 +719,20 @@ public interface Namespaces {
int getNamespaceMessageTTL(String namespace) throws PulsarAdminException;
/**
+ * Get the message TTL for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getNamespaceMessageTTLAsync(String namespace);
+
+ /**
* Set the messages Time to Live for all the topics within a namespace.
* <p>
* Request example:
@@ -508,9 +755,24 @@ public interface Namespaces {
*/
void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException;
+ /**
+ * Set the messages Time to Live for all the topics within a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param ttlInSeconds
+ * TTL values for all messages for all topics in this namespace
+ */
+ CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int ttlInSeconds);
/**
- * Set anti-affinity group name for a namespace
+ * Set anti-affinity group name for a namespace.
* <p>
* Request example:
*
@@ -529,7 +791,19 @@ public interface Namespaces {
void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException;
/**
- * Get all namespaces that grouped with given anti-affinity group
+ * Set anti-affinity group name for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * @param namespace
+ * Namespace name
+ * @param namespaceAntiAffinityGroup
+ * anti-affinity group name for a namespace
+ */
+ CompletableFuture<Void> setNamespaceAntiAffinityGroupAsync(String namespace, String namespaceAntiAffinityGroup);
+
+ /**
+ * Get all namespaces that grouped with given anti-affinity group.
*
* @param tenant
* tenant is only used for authorization. Client has to be admin of any of the tenant to access this
@@ -545,7 +819,21 @@ public interface Namespaces {
throws PulsarAdminException;
/**
- * Get anti-affinity group name for a namespace
+ * Get all namespaces that grouped with given anti-affinity group asynchronously.
+ *
+ * @param tenant
+ * tenant is only used for authorization. Client has to be admin of any of the tenant to access this
+ * api api.
+ * @param cluster
+ * cluster name
+ * @param namespaceAntiAffinityGroup
+ * Anti-affinity group name
+ * @return list of namespace grouped under a given anti-affinity group
+ */
+ CompletableFuture<List<String>> getAntiAffinityNamespacesAsync(String tenant, String cluster, String namespaceAntiAffinityGroup);
+
+ /**
+ * Get anti-affinity group name for a namespace.
* <p>
* Response example:
*
@@ -566,7 +854,21 @@ public interface Namespaces {
String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
/**
- * Delete anti-affinity group name for a namespace.
+ * Get anti-affinity group name for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<String> getNamespaceAntiAffinityGroupAsync(String namespace);
+
+ /**
+ * Delete anti-affinity group name for a namespace.
*
* @param namespace
* Namespace name
@@ -581,6 +883,14 @@ public interface Namespaces {
void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
/**
+ * Delete anti-affinity group name for a namespace.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String namespace);
+
+ /**
* Set the deduplication status for all topics within a namespace.
* <p>
* When deduplication is enabled, the broker will prevent to store the same message multiple times.
@@ -606,6 +916,24 @@ public interface Namespaces {
void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException;
/**
+ * Set the deduplication status for all topics within a namespace asynchronously.
+ * <p>
+ * When deduplication is enabled, the broker will prevent to store the same message multiple times.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>true</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param enableDeduplication
+ * wether to enable or disable deduplication feature
+ */
+ CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication);
+
+ /**
* Get the bundles split data.
*
* @param namespace
@@ -653,6 +981,33 @@ public interface Namespaces {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException;
/**
+ * Get backlog quota map on a namespace asynchronously.
+ * <p>
+ * Get backlog quota map on a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "namespace_memory" : {
+ * "limit" : "134217728",
+ * "policy" : "consumer_backlog_eviction"
+ * },
+ * "destination_storage" : {
+ * "limit" : "-1",
+ * "policy" : "producer_exception"
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> getBacklogQuotaMapAsync(String namespace);
+
+ /**
* Set a backlog quota for all the topics on a namespace.
* <p>
* Set a backlog quota on a namespace.
@@ -685,6 +1040,31 @@ public interface Namespaces {
void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException;
/**
+ * Set a backlog quota for all the topics on a namespace asynchronously.
+ * <p>
+ * Set a backlog quota on a namespace.
+ * <p>
+ * The backlog quota can be set on this resource:
+ * <p>
+ * Request parameter example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "limit" : "134217728",
+ * "policy" : "consumer_backlog_eviction"
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param backlogQuota
+ * the new BacklogQuota
+ */
+ CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota);
+
+ /**
* Remove a backlog quota policy from a namespace.
* <p>
* Remove a backlog quota policy from a namespace.
@@ -704,6 +1084,18 @@ public interface Namespaces {
void removeBacklogQuota(String namespace) throws PulsarAdminException;
/**
+ * Remove a backlog quota policy from a namespace asynchronously.
+ * <p>
+ * Remove a backlog quota policy from a namespace.
+ * <p>
+ * The backlog retention policy will fall back to the default.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> removeBacklogQuotaAsync(String namespace);
+
+ /**
* Set the persistence configuration for all the topics on a namespace.
* <p>
* Set the persistence configuration on a namespace.
@@ -723,10 +1115,6 @@ public interface Namespaces {
* </code>
* </pre>
*
- * @param tenant
- * Tenant name
- * @param cluster
- * Cluster name
* @param namespace
* Namespace name
* @param persistence
@@ -744,6 +1132,33 @@ public interface Namespaces {
void setPersistence(String namespace, PersistencePolicies persistence) throws PulsarAdminException;
/**
+ * Set the persistence configuration for all the topics on a namespace asynchronously.
+ * <p>
+ * Set the persistence configuration on a namespace.
+ * <p>
+ * Request parameter example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "bookkeeperEnsemble" : 3, // Number of bookies to use for a topic
+ * "bookkeeperWriteQuorum" : 2, // How many writes to make of each entry
+ * "bookkeeperAckQuorum" : 2, // Number of acks (guaranteed copies) to wait for each entry
+ * "managedLedgerMaxMarkDeleteRate" : 10.0, // Throttling rate of mark-delete operation
+ * // to avoid high number of updates for each
+ * // consumer
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param persistence
+ * Persistence policies object
+ */
+ CompletableFuture<Void> setPersistenceAsync(String namespace, PersistencePolicies persistence);
+
+ /**
* Get the persistence configuration for a namespace.
* <p>
* Get the persistence configuration for a namespace.
@@ -763,10 +1178,6 @@ public interface Namespaces {
* </code>
* </pre>
*
- * @param tenant
- * Tenant name
- * @param cluster
- * Cluster name
* @param namespace
* Namespace name
*
@@ -782,6 +1193,31 @@ public interface Namespaces {
PersistencePolicies getPersistence(String namespace) throws PulsarAdminException;
/**
+ * Get the persistence configuration for a namespace asynchronously.
+ * <p>
+ * Get the persistence configuration for a namespace.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "bookkeeperEnsemble" : 3, // Number of bookies to use for a topic
+ * "bookkeeperWriteQuorum" : 2, // How many writes to make of each entry
+ * "bookkeeperAckQuorum" : 2, // Number of acks (guaranteed copies) to wait for each entry
+ * "managedLedgerMaxMarkDeleteRate" : 10.0, // Throttling rate of mark-delete operation
+ * // to avoid high number of updates for each
+ * // consumer
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<PersistencePolicies> getPersistenceAsync(String namespace);
+
+ /**
* Set bookie affinity group for a namespace to isolate namespace write to bookies that are part of given affinity
* group.
*
@@ -793,6 +1229,17 @@ public interface Namespaces {
*/
void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup)
throws PulsarAdminException;
+
+ /**
+ * Set bookie affinity group for a namespace to isolate namespace write to bookies that are part of given affinity
+ * group asynchronously.
+ *
+ * @param namespace
+ * namespace name
+ * @param bookieAffinityGroup
+ * bookie affinity group
+ */
+ CompletableFuture<Void> setBookieAffinityGroupAsync(String namespace, BookieAffinityGroupData bookieAffinityGroup);
/**
* Delete bookie affinity group configured for a namespace.
@@ -803,6 +1250,13 @@ public interface Namespaces {
void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException;
/**
+ * Delete bookie affinity group configured for a namespace asynchronously.
+ *
+ * @param namespace
+ */
+ CompletableFuture<Void> deleteBookieAffinityGroupAsync(String namespace);
+
+ /**
* Get bookie affinity group configured for a namespace.
*
* @param namespace
@@ -812,6 +1266,14 @@ public interface Namespaces {
BookieAffinityGroupData getBookieAffinityGroup(String namespace) throws PulsarAdminException;
/**
+ * Get bookie affinity group configured for a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<BookieAffinityGroupData> getBookieAffinityGroupAsync(String namespace);
+
+ /**
* Set the retention configuration for all the topics on a namespace.
* <p/>
* Set the retention configuration on a namespace. This operation requires Pulsar super-user access.
@@ -843,6 +1305,28 @@ public interface Namespaces {
void setRetention(String namespace, RetentionPolicies retention) throws PulsarAdminException;
/**
+ * Set the retention configuration for all the topics on a namespace asynchronously.
+ * <p/>
+ * Set the retention configuration on a namespace. This operation requires Pulsar super-user access.
+ * <p/>
+ * Request parameter example:
+ * <p/>
+ *
+ * <pre>
+ * <code>
+ * {
+ * "retentionTimeInMinutes" : 60, // how long to retain messages
+ * "retentionSizeInMB" : 1024, // retention backlog limit
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention);
+
+ /**
* Get the retention configuration for a namespace.
* <p/>
* Get the retention configuration for a namespace.
@@ -873,6 +1357,28 @@ public interface Namespaces {
RetentionPolicies getRetention(String namespace) throws PulsarAdminException;
/**
+ * Get the retention configuration for a namespace asynchronously.
+ * <p/>
+ * Get the retention configuration for a namespace.
+ * <p/>
+ * Response example:
+ * <p/>
+ *
+ * <pre>
+ * <code>
+ * {
+ * "retentionTimeInMinutes" : 60, // how long to retain messages
+ * "retentionSizeInMB" : 1024, // retention backlog limit
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<RetentionPolicies> getRetentionAsync(String namespace);
+
+ /**
* Unload a namespace from the current serving broker.
*
* @param namespace
@@ -890,7 +1396,15 @@ public interface Namespaces {
void unload(String namespace) throws PulsarAdminException;
/**
- * Get the replication configuration version for a given namespace
+ * Unload a namespace from the current serving broker asynchronously.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> unloadAsync(String namespace);
+
+ /**
+ * Get the replication configuration version for a given namespace.
*
* @param namespace
* @return Replication configuration version
@@ -900,6 +1414,14 @@ public interface Namespaces {
String getReplicationConfigVersion(String namespace) throws PulsarAdminException;
/**
+ * Get the replication configuration version for a given namespace asynchronously.
+ *
+ * @param namespace
+ * @return Replication configuration version
+ */
+ CompletableFuture<String> getReplicationConfigVersionAsync(String namespace);
+
+ /**
* Unload namespace bundle
*
* @param namespace
@@ -922,18 +1444,28 @@ public interface Namespaces {
CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle);
/**
- * Split namespace bundle
+ * Split namespace bundle.
*
* @param namespace
- * @param range of bundle to split
- * @param unload newly split bundles from the broker
+ * @param bundle range of bundle to split
+ * @param unloadSplitBundles
+ * @param splitAlgorithmName
* @throws PulsarAdminException
- * Unexpected error
*/
void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException;
/**
- * Set message-publish-rate (topics under this namespace can publish this many messages per second)
+ * Split namespace bundle asynchronously.
+ *
+ * @param namespace
+ * @param bundle range of bundle to split
+ * @param unloadSplitBundles
+ * @param splitAlgorithmName
+ */
+ CompletableFuture<Void> splitNamespaceBundleAsync(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName);
+
+ /**
+ * Set message-publish-rate (topics under this namespace can publish this many messages per second).
*
* @param namespace
* @param publishMsgRate
@@ -943,17 +1475,33 @@ public interface Namespaces {
*/
void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException;
- /** Get message-publish-rate (topics under this namespace can publish this many messages per second)
- *
- * @param namespace
- * @returns messageRate
- * number of messages per second
- * @throws PulsarAdminException
- * Unexpected error
- */
+ /**
+ * Set message-publish-rate (topics under this namespace can publish this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @param publishMsgRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setPublishRateAsync(String namespace, PublishRate publishMsgRate);
+
+ /**
+ * Get message-publish-rate (topics under this namespace can publish this many messages per second)
+ *
+ * @param namespace
+ * @return number of messages per second
+ * @throws PulsarAdminException Unexpected error
+ */
PublishRate getPublishRate(String namespace) throws PulsarAdminException;
/**
+ * Get message-publish-rate (topics under this namespace can publish this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @return number of messages per second
+ */
+ CompletableFuture<PublishRate> getPublishRateAsync(String namespace);
+
+ /**
* Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
*
* @param namespace
@@ -964,17 +1512,36 @@ public interface Namespaces {
*/
void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
- /** Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
- *
- * @param namespace
- * @returns messageRate
- * number of messages per second
- * @throws PulsarAdminException
- * Unexpected error
- */
+ /**
+ * Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+ /**
+ * Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
+ *
+ * @param namespace
+ * @returns messageRate
+ * number of messages per second
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
DispatchRate getDispatchRate(String namespace) throws PulsarAdminException;
/**
+ * Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @returns messageRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getDispatchRateAsync(String namespace);
+
+ /**
* Set namespace-subscribe-rate (topics under this namespace will limit by subscribeRate)
*
* @param namespace
@@ -985,7 +1552,17 @@ public interface Namespaces {
*/
void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException;
- /** Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period)
+ /**
+ * Set namespace-subscribe-rate (topics under this namespace will limit by subscribeRate) asynchronously.
+ *
+ * @param namespace
+ * @param subscribeRate
+ * consumer subscribe limit by this subscribeRate
+ */
+ CompletableFuture<Void> setSubscribeRateAsync(String namespace, SubscribeRate subscribeRate);
+
+ /**
+ * Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period)
*
* @param namespace
* @returns subscribeRate
@@ -995,6 +1572,14 @@ public interface Namespaces {
SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException;
/**
+ * Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period) asynchronously.
+ *
+ * @param namespace
+ * @returns subscribeRate
+ */
+ CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace);
+
+ /**
* Set subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
*
* @param namespace
@@ -1005,7 +1590,17 @@ public interface Namespaces {
*/
void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
- /** Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
+ /**
+ * Set subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setSubscriptionDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+ /**
+ * Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second)
*
* @param namespace
* @returns DispatchRate
@@ -1016,6 +1611,15 @@ public interface Namespaces {
DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException;
/**
+ * Get subscription-message-dispatch-rate (subscriptions under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @returns DispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace);
+
+ /**
* Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
*
* @param namespace
@@ -1026,7 +1630,17 @@ public interface Namespaces {
*/
void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException;
- /** Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
+ /**
+ * Set replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @param dispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace, DispatchRate dispatchRate);
+
+ /**
+ * Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second)
*
* @param namespace
* @returns DispatchRate
@@ -1037,7 +1651,16 @@ public interface Namespaces {
DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException;
/**
- * Clear backlog for all topics on a namespace
+ * Get replicator-message-dispatch-rate (Replicators under this namespace can dispatch this many messages per second) asynchronously.
+ *
+ * @param namespace
+ * @returns DispatchRate
+ * number of messages per second
+ */
+ CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String namespace);
+
+ /**
+ * Clear backlog for all topics on a namespace.
*
* @param namespace
* @throws PulsarAdminException
@@ -1046,6 +1669,13 @@ public interface Namespaces {
void clearNamespaceBacklog(String namespace) throws PulsarAdminException;
/**
+ * Clear backlog for all topics on a namespace asynchronously.
+ *
+ * @param namespace
+ */
+ CompletableFuture<Void> clearNamespaceBacklogAsync(String namespace);
+
+ /**
* Clear backlog for a given subscription on all topics on a namespace
*
* @param namespace
@@ -1056,6 +1686,14 @@ public interface Namespaces {
void clearNamespaceBacklogForSubscription(String namespace, String subscription) throws PulsarAdminException;
/**
+ * Clear backlog for a given subscription on all topics on a namespace asynchronously.
+ *
+ * @param namespace
+ * @param subscription
+ */
+ CompletableFuture<Void> clearNamespaceBacklogForSubscriptionAsync(String namespace, String subscription);
+
+ /**
* Clear backlog for all topics on a namespace bundle
*
* @param namespace
@@ -1100,7 +1738,7 @@ public interface Namespaces {
String subscription);
/**
- * Unsubscribes the given subscription on all topics on a namespace
+ * Unsubscribe the given subscription on all topics on a namespace.
*
* @param namespace
* @param subscription
@@ -1109,7 +1747,15 @@ public interface Namespaces {
void unsubscribeNamespace(String namespace, String subscription) throws PulsarAdminException;
/**
- * Unsubscribes the given subscription on all topics on a namespace bundle
+ * Unsubscribe the given subscription on all topics on a namespace asynchronously.
+ *
+ * @param namespace
+ * @param subscription
+ */
+ CompletableFuture<Void> unsubscribeNamespaceAsync(String namespace, String subscription);
+
+ /**
+ * Unsubscribe the given subscription on all topics on a namespace bundle
*
* @param namespace
* @param bundle
@@ -1119,7 +1765,7 @@ public interface Namespaces {
void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription) throws PulsarAdminException;
/**
- * Unsubscribes the given subscription on all topics on a namespace bundle asynchronously
+ * Unsubscribe the given subscription on all topics on a namespace bundle asynchronously
*
* @param namespace
* @param bundle
@@ -1155,6 +1801,24 @@ public interface Namespaces {
void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException;
/**
+ * Set the encryption required status for all topics within a namespace asynchronously.
+ * <p>
+ * When encryption required is true, the broker will prevent to store unencrypted messages.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>true</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param encryptionRequired
+ * whether message encryption is required or not
+ */
+ CompletableFuture<Void> setEncryptionRequiredStatusAsync(String namespace, boolean encryptionRequired);
+
+ /**
* Get the delayed delivery messages for all topics within a namespace.
* <p>
* If disabled, messages will be immediately delivered and there will
@@ -1186,6 +1850,30 @@ public interface Namespaces {
DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException;
/**
+ * Get the delayed delivery messages for all topics within a namespace asynchronously.
+ * <p>
+ * If disabled, messages will be immediately delivered and there will
+ * be no tracking overhead.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "active" : true, // Enable or disable delayed delivery for messages on a namespace
+ * "tickTime" : 1000, // The tick time for when retrying on delayed delivery messages
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @return delayedDeliveryPolicies
+ * Whether to enable the delayed delivery for messages.
+ */
+ CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryAsync(String namespace);
+
+ /**
* Set the delayed delivery messages for all topics within a namespace.
* <p>
* If disabled, messages will be immediately delivered and there will
@@ -1217,7 +1905,31 @@ public interface Namespaces {
void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException;
/**
- * Set the given subscription auth mode on all topics on a namespace
+ * Set the delayed delivery messages for all topics within a namespace asynchronously.
+ * <p>
+ * If disabled, messages will be immediately delivered and there will
+ * be no tracking overhead.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "tickTime" : 1000, // Enable or disable delayed delivery for messages on a namespace
+ * "active" : true, // The tick time for when retrying on delayed delivery messages
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param delayedDeliveryPolicies
+ * Whether to enable the delayed delivery for messages.
+ */
+ CompletableFuture<Void> setDelayedDeliveryMessagesAsync(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
+
+ /**
+ * Set the given subscription auth mode on all topics on a namespace.
*
* @param namespace
* @param subscriptionAuthMode
@@ -1226,6 +1938,14 @@ public interface Namespaces {
void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException;
/**
+ * Set the given subscription auth mode on all topics on a namespace asynchronously.
+ *
+ * @param namespace
+ * @param subscriptionAuthMode
+ */
+ CompletableFuture<Void> setSubscriptionAuthModeAsync(String namespace, SubscriptionAuthMode subscriptionAuthMode);
+
+ /**
* Get the maxProducersPerTopic for a namespace.
* <p>
* Response example:
@@ -1247,6 +1967,20 @@ public interface Namespaces {
int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
/**
+ * Get the maxProducersPerTopic for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getMaxProducersPerTopicAsync(String namespace);
+
+ /**
* Set maxProducersPerTopic for a namespace.
* <p>
* Request example:
@@ -1270,6 +2004,22 @@ public interface Namespaces {
void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException;
/**
+ * Set maxProducersPerTopic for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxProducersPerTopic
+ * maxProducersPerTopic value for a namespace
+ */
+ CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic);
+
+ /**
* Get the maxProducersPerTopic for a namespace.
* <p>
* Response example:
@@ -1291,6 +2041,20 @@ public interface Namespaces {
int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException;
/**
+ * Get the maxProducersPerTopic for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getMaxConsumersPerTopicAsync(String namespace);
+
+ /**
* Set maxConsumersPerTopic for a namespace.
* <p>
* Request example:
@@ -1314,6 +2078,22 @@ public interface Namespaces {
void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException;
/**
+ * Set maxConsumersPerTopic for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxConsumersPerTopic
+ * maxConsumersPerTopic value for a namespace
+ */
+ CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, int maxConsumersPerTopic);
+
+ /**
* Get the maxConsumersPerSubscription for a namespace.
* <p>
* Response example:
@@ -1332,10 +2112,47 @@ public interface Namespaces {
* @throws PulsarAdminException
* Unexpected error
*/
- int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+ int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the maxConsumersPerSubscription for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String namespace);
+
+ /**
+ * Set maxConsumersPerSubscription for a namespace.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxConsumersPerSubscription
+ * maxConsumersPerSubscription value for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
/**
- * Set maxConsumersPerSubscription for a namespace.
+ * Set maxConsumersPerSubscription for a namespace asynchronously.
* <p>
* Request example:
*
@@ -1347,15 +2164,8 @@ public interface Namespaces {
* Namespace name
* @param maxConsumersPerSubscription
* maxConsumersPerSubscription value for a namespace
- *
- * @throws NotAuthorizedException
- * Don't have admin permission
- * @throws NotFoundException
- * Namespace does not exist
- * @throws PulsarAdminException
- * Unexpected error
*/
- void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException;
+ CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription);
/**
* Get the maxUnackedMessagesPerConsumer for a namespace.
@@ -1379,6 +2189,20 @@ public interface Namespaces {
int getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException;
/**
+ * Get the maxUnackedMessagesPerConsumer for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getMaxUnackedMessagesPerConsumerAsync(String namespace);
+
+ /**
* Set maxUnackedMessagesPerConsumer for a namespace.
* <p>
* Request example:
@@ -1402,6 +2226,22 @@ public interface Namespaces {
void setMaxUnackedMessagesPerConsumer(String namespace, int maxUnackedMessagesPerConsumer) throws PulsarAdminException;
/**
+ * Set maxUnackedMessagesPerConsumer for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxUnackedMessagesPerConsumer
+ * maxUnackedMessagesPerConsumer value for a namespace
+ */
+ CompletableFuture<Void> setMaxUnackedMessagesPerConsumerAsync(String namespace, int maxUnackedMessagesPerConsumer);
+
+ /**
* Get the maxUnackedMessagesPerSubscription for a namespace.
* <p>
* Response example:
@@ -1423,6 +2263,20 @@ public interface Namespaces {
int getMaxUnackedMessagesPerSubscription(String namespace) throws PulsarAdminException;
/**
+ * Get the maxUnackedMessagesPerSubscription for a namespace asynchronously.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>0</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Integer> getMaxUnackedMessagesPerSubscriptionAsync(String namespace);
+
+ /**
* Set maxUnackedMessagesPerSubscription for a namespace.
* <p>
* Request example:
@@ -1446,6 +2300,22 @@ public interface Namespaces {
void setMaxUnackedMessagesPerSubscription(String namespace, int maxUnackedMessagesPerSubscription) throws PulsarAdminException;
/**
+ * Set maxUnackedMessagesPerSubscription for a namespace asynchronously.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param maxUnackedMessagesPerSubscription
+ * Max number of unacknowledged messages allowed per shared subscription.
+ */
+ CompletableFuture<Void> setMaxUnackedMessagesPerSubscriptionAsync(String namespace, int maxUnackedMessagesPerSubscription);
+
+ /**
* Get the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace
* can have before compaction is triggered. 0 disables.
* <p>
@@ -1468,6 +2338,21 @@ public interface Namespaces {
long getCompactionThreshold(String namespace) throws PulsarAdminException;
/**
+ * Get the compactionThreshold for a namespace asynchronously. The maximum number of bytes topics in the namespace
+ * can have before compaction is triggered. 0 disables.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Long> getCompactionThresholdAsync(String namespace);
+
+ /**
* Set the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace
* can have before compaction is triggered. 0 disables.
* <p>
@@ -1492,6 +2377,23 @@ public interface Namespaces {
void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
/**
+ * Set the compactionThreshold for a namespace asynchronously. The maximum number of bytes topics in the namespace
+ * can have before compaction is triggered. 0 disables.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param compactionThreshold
+ * maximum number of backlog bytes before compaction is triggered
+ */
+ CompletableFuture<Void> setCompactionThresholdAsync(String namespace, long compactionThreshold);
+
+ /**
* Get the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
* in the namespace before data starts being offloaded to longterm storage.
*
@@ -1515,6 +2417,22 @@ public interface Namespaces {
long getOffloadThreshold(String namespace) throws PulsarAdminException;
/**
+ * Get the offloadThreshold for a namespace asynchronously. The maximum number of bytes stored on the pulsar cluster for topics
+ * in the namespace before data starts being offloaded to longterm storage.
+ *
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Long> getOffloadThresholdAsync(String namespace);
+
+ /**
* Set the offloadThreshold for a namespace. The maximum number of bytes stored on the pulsar cluster for topics
* in the namespace before data starts being offloaded to longterm storage.
*
@@ -1538,7 +2456,26 @@ public interface Namespaces {
* @throws PulsarAdminException
* Unexpected error
*/
- void setOffloadThreshold(String namespace, long compactionThreshold) throws PulsarAdminException;
+ void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException;
+
+ /**
+ * Set the offloadThreshold for a namespace asynchronously. The maximum number of bytes stored on the pulsar cluster for topics
+ * in the namespace before data starts being offloaded to longterm storage.
+ *
+ * Negative values disabled automatic offloading. Setting a threshold of 0 will offload data as soon as possible.
+ * <p>
+ * Request example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param offloadThreshold
+ * maximum number of bytes stored before offloading is triggered
+ */
+ CompletableFuture<Void> setOffloadThresholdAsync(String namespace, long offloadThreshold);
/**
* Get the offload deletion lag for a namespace, in milliseconds.
@@ -1572,6 +2509,30 @@ public interface Namespaces {
Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
/**
+ * Get the offload deletion lag asynchronously for a namespace, in milliseconds.
+ * The number of milliseconds to wait before deleting a ledger segment which has been offloaded from
+ * the Pulsar cluster's local storage (i.e. BookKeeper).
+ *
+ * If the offload deletion lag has not been set for the namespace, the method returns 'null'
+ * and the namespace will use the configured default of the pulsar broker.
+ *
+ * A negative value disables deletion of the local ledger completely, though it will still be deleted
+ * if it exceeds the topics retention policy, along with the offloaded copy.
+ *
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>3600000</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @return the offload deletion lag for the namespace in milliseconds, or null if not set
+ */
+ CompletableFuture<Long> getOffloadDeleteLagMsAsync(String namespace);
+
+ /**
* Set the offload deletion lag for a namespace.
*
* The offload deletion lag is the amount of time to wait after offloading a ledger segment to long term storage,
@@ -1595,6 +2556,22 @@ public interface Namespaces {
void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException;
/**
+ * Set the offload deletion lag for a namespace asynchronously.
+ *
+ * The offload deletion lag is the amount of time to wait after offloading a ledger segment to long term storage,
+ * before deleting its copy stored on the Pulsar cluster's local storage (i.e. BookKeeper).
+ *
+ * A negative value disables deletion of the local ledger completely, though it will still be deleted
+ * if it exceeds the topics retention policy, along with the offloaded copy.
+ *
+ * @param namespace
+ * Namespace name
+ * @param lag the duration to wait before deleting the local copy
+ * @param unit the timeunit of the duration
+ */
+ CompletableFuture<Void> setOffloadDeleteLagAsync(String namespace, long lag, TimeUnit unit);
+
+ /**
* Clear the offload deletion lag for a namespace.
*
* The namespace will fall back to using the configured default of the pulsar broker.
@@ -1609,6 +2586,13 @@ public interface Namespaces {
void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
/**
+ * Clear the offload deletion lag for a namespace asynchronously.
+ *
+ * The namespace will fall back to using the configured default of the pulsar broker.
+ */
+ CompletableFuture<Void> clearOffloadDeleteLagAsync(String namespace);
+
+ /**
* Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
* before it is installed.
*
@@ -1662,6 +2646,14 @@ public interface Namespaces {
*/
boolean getSchemaValidationEnforced(String namespace)
throws PulsarAdminException;
+
+ /**
+ * Get schema validation enforced for namespace asynchronously.
+ *
+ * @return the schema validation enforced flag
+ */
+ CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace);
+
/**
* Set schema validation enforced for namespace.
* if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
@@ -1681,6 +2673,17 @@ public interface Namespaces {
throws PulsarAdminException;
/**
+ * Set schema validation enforced for namespace asynchronously.
+ * if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
+ * producer will be failed to connect. PLEASE be carefully on using this, since non-java clients don't
+ * support schema. if you enable this setting, it will cause non-java clients failed to produce.
+ *
+ * @param namespace pulsar namespace name
+ * @param schemaValidationEnforced flag to enable or disable schema validation for the given namespace
+ */
+ CompletableFuture<Void> setSchemaValidationEnforcedAsync(String namespace, boolean schemaValidationEnforced);
+
+ /**
* Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
* before it is installed.
*
@@ -1694,7 +2697,16 @@ public interface Namespaces {
* Unexpected error
*/
SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace)
- throws PulsarAdminException;;
+ throws PulsarAdminException;
+
+ /**
+ * Get the strategy used to check the a new schema provided by a producer is compatible with the current schema
+ * before it is installed asynchronously.
+ *
+ * @param namespace The namespace in whose policy we are interested
+ * @return the strategy used to check compatibility
+ */
+ CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync(String namespace);
/**
* Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
@@ -1714,7 +2726,17 @@ public interface Namespaces {
throws PulsarAdminException;
/**
- * Get whether allow auto update schema
+ * Set the strategy used to check the a new schema provided by a producer is compatible with the current schema
+ * before it is installed asynchronously.
+ *
+ * @param namespace The namespace in whose policy should be set
+ * @param strategy The schema compatibility strategy
+ */
+ CompletableFuture<Void> setSchemaCompatibilityStrategyAsync(String namespace,
+ SchemaCompatibilityStrategy strategy);
+
+ /**
+ * Get whether allow auto update schema.
*
* @param namespace pulsar namespace name
* @return the schema validation enforced flag
@@ -1729,6 +2751,14 @@ public interface Namespaces {
throws PulsarAdminException;
/**
+ * Get whether allow auto update schema asynchronously.
+ *
+ * @param namespace pulsar namespace name
+ * @return the schema validation enforced flag
+ */
+ CompletableFuture<Boolean> getIsAllowAutoUpdateSchemaAsync(String namespace);
+
+ /**
* The flag is when producer bring a new schema and the schema pass compatibility check
* whether allow schema auto registered
*
@@ -1745,6 +2775,15 @@ public interface Namespaces {
throws PulsarAdminException;
/**
+ * The flag is when producer bring a new schema and the schema pass compatibility check
+ * whether allow schema auto registered
+ *
+ * @param namespace pulsar namespace name
+ * @param isAllowAutoUpdateSchema flag to enable or disable auto update schema
+ */
+ CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, boolean isAllowAutoUpdateSchema);
+
+ /**
* Set the offload configuration for all the topics in a namespace.
* <p/>
* Set the offload configuration in a namespace. This operation requires pulsar tenant access.
@@ -1781,6 +2820,33 @@ public interface Namespaces {
void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException;
/**
+ * Set the offload configuration for all the topics in a namespace asynchronously.
+ * <p/>
+ * Set the offload configuration in a namespace. This operation requires pulsar tenant access.
+ * <p/>
+ * Request parameter example:
+ * <p/>
+ *
+ * <pre>
+ * <code>
+ * {
+ * "region" : "us-east-2", // The long term storage region
+ * "bucket" : "bucket", // Bucket to place offloaded ledger into
+ * "endpoint" : "endpoint", // Alternative endpoint to connect to
+ * "maxBlockSize" : 1024, // Max Block Size, default 64MB
+ * "readBufferSize" : 1024, // Read Buffer Size, default 1MB
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param offloadPolicies
+ * Offload configuration
+ */
+ CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies);
+
+ /**
* Get the offload configuration for a namespace.
* <p/>
* Get the offload configuration for a namespace.
@@ -1813,4 +2879,28 @@ public interface Namespaces {
*/
OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException;
+ /**
+ * Get the offload configuration for a namespace asynchronously.
+ * <p/>
+ * Get the offload configuration for a namespace.
+ * <p/>
+ * Response example:
+ * <p/>
+ *
+ * <pre>
+ * <code>
+ * {
+ * "region" : "us-east-2", // The long term storage region
+ * "bucket" : "bucket", // Bucket to place offloaded ledger into
+ * "endpoint" : "endpoint", // Alternative endpoint to connect to
+ * "maxBlockSize" : 1024, // Max Block Size, default 64MB
+ * "readBufferSize" : 1024, // Read Buffer Size, default 1MB
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String namespace);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 3b60fa2..7c5b79a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -26,8 +26,10 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
@@ -68,68 +70,164 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
@Override
public List<String> getNamespaces(String tenant) throws PulsarAdminException {
try {
- WebTarget path = adminV2Namespaces.path(tenant);
- return request(path).get(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespacesAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getNamespacesAsync(String tenant) {
+ WebTarget path = adminV2Namespaces.path(tenant);
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> namespaces) {
+ future.complete(namespaces);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public List<String> getNamespaces(String tenant, String cluster) throws PulsarAdminException {
- try {
- WebTarget path = adminNamespaces.path(tenant).path(cluster);
- return request(path).get(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ WebTarget path = adminNamespaces.path(tenant).path(cluster);
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+
+ @Override
+ public void completed(List<String> namespaces) {
+ future.complete(namespaces);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ try {
+ return future.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public List<String> getTopics(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- String action = ns.isV2() ? "topics" : "destinations";
- WebTarget path = namespacePath(ns, action);
- return request(path).get(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getTopicsAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getTopicsAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ String action = ns.isV2() ? "topics" : "destinations";
+ WebTarget path = namespacePath(ns, action);
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> topics) {
+ future.complete(topics);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public Policies getPolicies(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns);
- return request(path).get(Policies.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getPoliciesAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Policies> getPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns);
+ final CompletableFuture<Policies> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Policies>() {
+ @Override
+ public void completed(Policies policies) {
+ future.complete(policies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void createNamespace(String namespace, Set<String> clusters) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns);
-
- if (ns.isV2()) {
- // For V2 API we pass full Policy class instance
- Policies policies = new Policies();
- policies.replication_clusters = clusters;
- request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
- } else {
- // For V1 API, we pass the BundlesData on creation
- request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ createNamespaceAsync(namespace, clusters).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createNamespaceAsync(String namespace, Set<String> clusters) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns);
+
+ if (ns.isV2()) {
+ // For V2 API we pass full Policy class instance
+ Policies policies = new Policies();
+ policies.replication_clusters = clusters;
+ return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+ } else {
+ // For V1 API, we pass the BundlesData on creation
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)).thenAccept(ignore -> {
// For V1, we need to do it in 2 steps
- setNamespaceReplicationClusters(namespace, clusters);
- }
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceReplicationClustersAsync(namespace, clusters);
+ });
}
}
@@ -139,71 +237,116 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public CompletableFuture<Void> createNamespaceAsync(String namespace, int numBundles) {
+ return createNamespaceAsync(namespace, new BundlesData(numBundles));
+ }
+
+ @Override
public void createNamespace(String namespace, Policies policies) throws PulsarAdminException {
+ try {
+ createNamespaceAsync(namespace, policies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies) {
NamespaceName ns = NamespaceName.get(namespace);
checkArgument(ns.isV2(), "Create namespace with policies is only supported on newer namespaces");
+ WebTarget path = namespacePath(ns);
+ // For V2 API we pass full Policy class instance
+ return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+ }
+ @Override
+ public void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException {
try {
- WebTarget path = namespacePath(ns);
-
- // For V2 API we pass full Policy class instance
- request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ createNamespaceAsync(namespace, bundlesData).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void createNamespace(String namespace, BundlesData bundlesData) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns);
-
- if (ns.isV2()) {
- // For V2 API we pass full Policy class instance
- Policies policies = new Policies();
- policies.bundles = bundlesData;
- request(path).put(Entity.entity(policies, MediaType.APPLICATION_JSON), ErrorData.class);
- } else {
- // For V1 API, we pass the BundlesData on creation
- request(path).put(Entity.entity(bundlesData, MediaType.APPLICATION_JSON), ErrorData.class);
- }
- } catch (Exception e) {
- throw getApiException(e);
+ public CompletableFuture<Void> createNamespaceAsync(String namespace, BundlesData bundlesData) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns);
+
+ if (ns.isV2()) {
+ // For V2 API we pass full Policy class instance
+ Policies policies = new Policies();
+ policies.bundles = bundlesData;
+ return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
+ } else {
+ // For V1 API, we pass the BundlesData on creation
+ return asyncPutRequest(path, Entity.entity(bundlesData, MediaType.APPLICATION_JSON));
}
}
@Override
public void createNamespace(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns);
- request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ createNamespaceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> createNamespaceAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns);
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void deleteNamespace(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns);
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteNamespaceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteNamespaceAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns);
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException {
try {
- deleteNamespaceBundleAsync(namespace, bundleRange).get();
+ deleteNamespaceBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -217,473 +360,1079 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
@Override
public Map<String, Set<AuthAction>> getPermissions(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "permissions");
- return request(path).get(new GenericType<Map<String, Set<AuthAction>>>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getPermissionsAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions");
+ final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, Set<AuthAction>>>() {
+ @Override
+ public void completed(Map<String, Set<AuthAction>> permissions) {
+ future.complete(permissions);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void grantPermissionOnNamespace(String namespace, String role, Set<AuthAction> actions)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "permissions", role);
- request(path).post(Entity.entity(actions, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ grantPermissionOnNamespaceAsync(namespace, role, actions)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace, String role, Set<AuthAction> actions) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", role);
+ return asyncPostRequest(path, Entity.entity(actions, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void revokePermissionsOnNamespace(String namespace, String role) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "permissions", role);
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ revokePermissionsOnNamespaceAsync(namespace, role).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Void> revokePermissionsOnNamespaceAsync(String namespace, String role) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", role);
+ return asyncDeleteRequest(path);
+ }
+
@Override
public void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "permissions", "subscription", subscription);
- request(path).post(Entity.entity(roles, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ grantPermissionOnSubscriptionAsync(namespace, subscription, roles)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> grantPermissionOnSubscriptionAsync(String namespace, String subscription, Set<String> roles) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", "subscription", subscription);
+ return asyncPostRequest(path, Entity.entity(roles, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void revokePermissionOnSubscription(String namespace, String subscription, String role) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "permissions", subscription, role);
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ revokePermissionOnSubscriptionAsync(namespace, subscription, role)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", subscription, role);
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "replication");
- return request(path).get(new GenericType<List<String>>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceReplicationClustersAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getNamespaceReplicationClustersAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "replication");
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> clusters) {
+ future.complete(clusters);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setNamespaceReplicationClusters(String namespace, Set<String> clusterIds) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "replication");
- request(path).post(Entity.entity(clusterIds, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceReplicationClustersAsync(namespace, clusterIds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setNamespaceReplicationClustersAsync(String namespace, Set<String> clusterIds) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "replication");
+ return asyncPostRequest(path, Entity.entity(clusterIds, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getNamespaceMessageTTL(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "messageTTL");
- return request(path).get(new GenericType<Integer>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceMessageTTLAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Integer> getNamespaceMessageTTLAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "messageTTL");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer ttl) {
+ future.complete(ttl);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "messageTTL");
- request(path).post(Entity.entity(ttlInSeconds, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceMessageTTLAsync(namespace, ttlInSeconds)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setNamespaceMessageTTLAsync(String namespace, int ttlInSeconds) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "messageTTL");
+ return asyncPostRequest(path, Entity.entity(ttlInSeconds, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "antiAffinity");
- request(path).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceAntiAffinityGroupAsync(namespace, namespaceAntiAffinityGroup)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setNamespaceAntiAffinityGroupAsync(String namespace, String namespaceAntiAffinityGroup) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "antiAffinity");
+ return asyncPostRequest(path, Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "antiAffinity");
- return request(path).get(new GenericType<String>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceAntiAffinityGroupAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<String> getNamespaceAntiAffinityGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "antiAffinity");
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String s) {
+ future.complete(s);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public List<String> getAntiAffinityNamespaces(String tenant, String cluster, String namespaceAntiAffinityGroup)
throws PulsarAdminException {
try {
- WebTarget path = adminNamespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup);
- return request(path.queryParam("property", tenant)).get(new GenericType<List<String>>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getAntiAffinityNamespacesAsync(tenant, cluster, namespaceAntiAffinityGroup)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getAntiAffinityNamespacesAsync(String tenant, String cluster, String namespaceAntiAffinityGroup) {
+ WebTarget path = adminNamespaces.path(cluster)
+ .path("antiAffinity").path(namespaceAntiAffinityGroup).queryParam("property", tenant);
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> antiNamespaces) {
+ future.complete(antiNamespaces);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "antiAffinity");
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteNamespaceAntiAffinityGroupAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteNamespaceAntiAffinityGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "antiAffinity");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "deduplication");
- request(path).post(Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setDeduplicationStatusAsync(namespace, enableDeduplication)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "deduplication");
+ return asyncPostRequest(path, Entity.entity(enableDeduplication, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "backlogQuotaMap");
- return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getBacklogQuotaMapAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> getBacklogQuotaMapAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "backlogQuotaMap");
+ final CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<BacklogQuotaType, BacklogQuota>>() {
+ @Override
+ public void completed(Map<BacklogQuotaType, BacklogQuota> quotaMap) {
+ future.complete(quotaMap);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "backlogQuota");
- request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setBacklogQuotaAsync(namespace, backlogQuota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "backlogQuota");
+ return asyncPostRequest(path, Entity.entity(backlogQuota, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void removeBacklogQuota(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "backlogQuota");
- request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
- .delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ removeBacklogQuotaAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "backlogQuota")
+ .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString());
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public void setPersistence(String namespace, PersistencePolicies persistence) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "persistence");
- request(path).post(Entity.entity(persistence, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setPersistenceAsync(namespace, persistence).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setPersistenceAsync(String namespace, PersistencePolicies persistence) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence");
+ return asyncPostRequest(path, Entity.entity(persistence, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
- request(path).post(Entity.entity(bookieAffinityGroup, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setBookieAffinityGroupAsync(namespace, bookieAffinityGroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setBookieAffinityGroupAsync(String namespace, BookieAffinityGroupData bookieAffinityGroup) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+ return asyncPostRequest(path, Entity.entity(bookieAffinityGroup, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteBookieAffinityGroupAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteBookieAffinityGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public BookieAffinityGroupData getBookieAffinityGroup(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
- return request(path).get(BookieAffinityGroupData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getBookieAffinityGroupAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<BookieAffinityGroupData> getBookieAffinityGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+ final CompletableFuture<BookieAffinityGroupData> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<BookieAffinityGroupData>() {
+ @Override
+ public void completed(BookieAffinityGroupData bookieAffinityGroupData) {
+ future.complete(bookieAffinityGroupData);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public PersistencePolicies getPersistence(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "persistence");
- return request(path).get(PersistencePolicies.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getPersistenceAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<PersistencePolicies> getPersistenceAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence");
+ final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<PersistencePolicies>() {
+ @Override
+ public void completed(PersistencePolicies persistencePolicies) {
+ future.complete(persistencePolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setRetention(String namespace, RetentionPolicies retention) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "retention");
- request(path).post(Entity.entity(retention, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setRetentionAsync(namespace, retention).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
+ }
+ @Override
+ public CompletableFuture<Void> setRetentionAsync(String namespace, RetentionPolicies retention) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "retention");
+ return asyncPostRequest(path, Entity.entity(retention, MediaType.APPLICATION_JSON));
}
@Override
public RetentionPolicies getRetention(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "retention");
- return request(path).get(RetentionPolicies.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getRetentionAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<RetentionPolicies> getRetentionAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "retention");
+ final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<RetentionPolicies>() {
+ @Override
+ public void completed(RetentionPolicies retentionPolicies) {
+ future.complete(retentionPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void unload(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "unload");
- request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ unloadAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> unloadAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "unload");
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public String getReplicationConfigVersion(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "configversion");
- return request(path).get(String.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getReplicationConfigVersionAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<String> getReplicationConfigVersionAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "configversion");
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String s) {
+ future.complete(s);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException {
+ try {
+ unloadNamespaceBundleAsync(namespace, bundle).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException {
+ public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "unload");
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)
+ throws PulsarAdminException {
+ try {
+ splitNamespaceBundleAsync(namespace, bundle, unloadSplitBundles, splitAlgorithmName)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> splitNamespaceBundleAsync(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "split")
+ .queryParam("unload", Boolean.toString(unloadSplitBundles))
+ .queryParam("splitAlgorithmName", splitAlgorithmName);
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException {
+ try {
+ setPublishRateAsync(namespace, publishMsgRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setPublishRateAsync(String namespace, PublishRate publishMsgRate) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "publishRate");
+ return asyncPostRequest(path, Entity.entity(publishMsgRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public PublishRate getPublishRate(String namespace) throws PulsarAdminException {
+ try {
+ return getPublishRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<PublishRate> getPublishRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "publishRate");
+ final CompletableFuture<PublishRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<PublishRate>() {
+ @Override
+ public void completed(PublishRate publishRate) {
+ future.complete(publishRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
+ try {
+ setDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "dispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException {
+ try {
+ return getDispatchRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<DispatchRate> getDispatchRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "dispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException {
try {
- unloadNamespaceBundleAsync(namespace, bundle).get();
+ setSubscribeRateAsync(namespace, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
+ public CompletableFuture<Void> setSubscribeRateAsync(String namespace, SubscribeRate subscribeRate) {
NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "unload");
- return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ WebTarget path = namespacePath(ns, "subscribeRate");
+ return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
}
@Override
- public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)
- throws PulsarAdminException {
+ public SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "split");
- request(path.queryParam("unload", Boolean.toString(unloadSplitBundles))
- .queryParam("splitAlgorithmName", splitAlgorithmName))
- .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getSubscribeRateAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException {
+ public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscribeRate");
+ final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<SubscribeRate>() {
+ @Override
+ public void completed(SubscribeRate subscribeRate) {
+ future.complete(subscribeRate);
+ }
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "publishRate");
- request(path).post(Entity.entity(publishMsgRate, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
-
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
}
@Override
- public PublishRate getPublishRate(String namespace) throws PulsarAdminException {
+ public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "publishRate");
- return request(path).get(PublishRate.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setSubscriptionDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
-
+
@Override
- public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "dispatchRate");
- request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}
@Override
- public DispatchRate getDispatchRate(String namespace) throws PulsarAdminException {
+ public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "dispatchRate");
- return request(path).get(DispatchRate.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getSubscriptionDispatchRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void setSubscribeRate(String namespace, SubscribeRate subscribeRate) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "subscribeRate");
- request(path).post(Entity.entity(subscribeRate, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
}
@Override
- public SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException {
+ public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "subscribeRate");
- return request(path).get(SubscribeRate.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setReplicatorDispatchRateAsync(namespace, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
- request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<Void> setReplicatorDispatchRateAsync(String namespace, DispatchRate dispatchRate) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+ return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}
@Override
- public DispatchRate getSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
+ public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
- return request(path).get(DispatchRate.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getReplicatorDispatchRateAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void setReplicatorDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "replicatorDispatchRate");
- request(path).post(Entity.entity(dispatchRate, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "replicatorDispatchRate");
+ final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DispatchRate>() {
+ @Override
+ public void completed(DispatchRate dispatchRate) {
+ future.complete(dispatchRate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
}
@Override
- public DispatchRate getReplicatorDispatchRate(String namespace) throws PulsarAdminException {
+ public void clearNamespaceBacklog(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "replicatorDispatchRate");
- return request(path).get(DispatchRate.class);
- } catch (Exception e) {
- throw getApiException(e);
+ clearNamespaceBacklogAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void clearNamespaceBacklog(String namespace) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "clearBacklog");
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<Void> clearNamespaceBacklogAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "clearBacklog");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void clearNamespaceBacklogForSubscription(String namespace, String subscription)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "clearBacklog", subscription);
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ clearNamespaceBacklogForSubscriptionAsync(namespace, subscription).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> clearNamespaceBacklogForSubscriptionAsync(String namespace, String subscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "clearBacklog", subscription);
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void clearNamespaceBundleBacklog(String namespace, String bundle) throws PulsarAdminException {
try {
- clearNamespaceBundleBacklogAsync(namespace, bundle).get();
+ clearNamespaceBundleBacklogAsync(namespace, bundle).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -698,12 +1447,15 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
public void clearNamespaceBundleBacklogForSubscription(String namespace, String bundle, String subscription)
throws PulsarAdminException {
try {
- clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, subscription).get();
+ clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, subscription)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -718,24 +1470,37 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
@Override
public void unsubscribeNamespace(String namespace, String subscription) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "unsubscribe", subscription);
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ unsubscribeNamespaceAsync(namespace, subscription).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> unsubscribeNamespaceAsync(String namespace, String subscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "unsubscribe", subscription);
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void unsubscribeNamespaceBundle(String namespace, String bundle, String subscription)
throws PulsarAdminException {
try {
- unsubscribeNamespaceBundleAsync(namespace, bundle, subscription).get();
+ unsubscribeNamespaceBundleAsync(namespace, bundle, subscription)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -750,236 +1515,581 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
@Override
public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode subscriptionAuthMode) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "subscriptionAuthMode");
- request(path).post(Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setSubscriptionAuthModeAsync(namespace, subscriptionAuthMode).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setSubscriptionAuthModeAsync(String namespace, SubscriptionAuthMode subscriptionAuthMode) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionAuthMode");
+ return asyncPostRequest(path, Entity.entity(subscriptionAuthMode, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "encryptionRequired");
- request(path).post(Entity.entity(encryptionRequired, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setEncryptionRequiredStatusAsync(namespace, encryptionRequired)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setEncryptionRequiredStatusAsync(String namespace, boolean encryptionRequired) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "encryptionRequired");
+ return asyncPostRequest(path, Entity.entity(encryptionRequired, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "delayedDelivery");
- return request(path).get(DelayedDeliveryPolicies.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getDelayedDeliveryAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "delayedDelivery");
+ final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<DelayedDeliveryPolicies>() {
+ @Override
+ public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ future.complete(delayedDeliveryPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setDelayedDeliveryMessages(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "delayedDelivery");
- request(path).post(Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setDelayedDeliveryMessagesAsync(namespace, delayedDeliveryPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setDelayedDeliveryMessagesAsync(String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "delayedDelivery");
+ return asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxProducersPerTopic");
- return request(path).get(Integer.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getMaxProducersPerTopicAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Integer> getMaxProducersPerTopicAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer max) {
+ future.complete(max);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setMaxProducersPerTopic(String namespace, int maxProducersPerTopic) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxProducersPerTopic");
- request(path).post(Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setMaxProducersPerTopicAsync(namespace, maxProducersPerTopic).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+ return asyncPostRequest(path, Entity.entity(maxProducersPerTopic, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
- return request(path).get(Integer.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getMaxConsumersPerTopicAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Integer> getMaxConsumersPerTopicAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer max) {
+ future.complete(max);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setMaxConsumersPerTopic(String namespace, int maxConsumersPerTopic) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
- request(path).post(Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setMaxConsumersPerTopicAsync(namespace, maxConsumersPerTopic)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setMaxConsumersPerTopicAsync(String namespace, int maxConsumersPerTopic) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
+ return asyncPostRequest(path, Entity.entity(maxConsumersPerTopic, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+ try {
+ return getMaxConsumersPerSubscriptionAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+ public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer max) {
+ future.complete(max);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
- return request(path).get(Integer.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setMaxConsumersPerSubscriptionAsync(namespace, maxConsumersPerSubscription)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException {
- try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
- request(path).post(Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
- }
+ public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+ return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
}
@Override
public int getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
- return request(path).get(Integer.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getMaxUnackedMessagesPerConsumerAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Integer> getMaxUnackedMessagesPerConsumerAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer max) {
+ future.complete(max);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setMaxUnackedMessagesPerConsumer(String namespace, int maxUnackedMessagesPerConsumer) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
- request(path).post(Entity.entity(maxUnackedMessagesPerConsumer, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setMaxUnackedMessagesPerConsumerAsync(namespace, maxUnackedMessagesPerConsumer).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setMaxUnackedMessagesPerConsumerAsync(String namespace, int maxUnackedMessagesPerConsumer) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
+ return asyncPostRequest(path, Entity.entity(maxUnackedMessagesPerConsumer, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getMaxUnackedMessagesPerSubscription(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
- return request(path).get(Integer.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getMaxUnackedMessagesPerSubscriptionAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Integer> getMaxUnackedMessagesPerSubscriptionAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Integer>() {
+ @Override
+ public void completed(Integer max) {
+ future.complete(max);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setMaxUnackedMessagesPerSubscription(String namespace, int maxUnackedMessagesPerSubscription) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
- request(path).post(Entity.entity(maxUnackedMessagesPerSubscription, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setMaxUnackedMessagesPerSubscriptionAsync(namespace, maxUnackedMessagesPerSubscription)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setMaxUnackedMessagesPerSubscriptionAsync(String namespace, int maxUnackedMessagesPerSubscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
+ return asyncPostRequest(path, Entity.entity(maxUnackedMessagesPerSubscription, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public long getCompactionThreshold(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "compactionThreshold");
- return request(path).get(Long.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getCompactionThresholdAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Long> getCompactionThresholdAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "compactionThreshold");
+ final CompletableFuture<Long> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Long>() {
+ @Override
+ public void completed(Long threshold) {
+ future.complete(threshold);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "compactionThreshold");
- request(path).put(Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setCompactionThresholdAsync(namespace, compactionThreshold)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setCompactionThresholdAsync(String namespace, long compactionThreshold) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "compactionThreshold");
+ return asyncPutRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public long getOffloadThreshold(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadThreshold");
- return request(path).get(Long.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getOffloadThresholdAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Long> getOffloadThresholdAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadThreshold");
+ final CompletableFuture<Long> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Long>() {
+ @Override
+ public void completed(Long threshold) {
+ future.complete(threshold);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setOffloadThreshold(String namespace, long offloadThreshold) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadThreshold");
- request(path).put(Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setOffloadThresholdAsync(namespace, offloadThreshold).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setOffloadThresholdAsync(String namespace, long offloadThreshold) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadThreshold");
+ return asyncPutRequest(path, Entity.entity(offloadThreshold, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
- return request(path).get(Long.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getOffloadDeleteLagMsAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Long> getOffloadDeleteLagMsAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ final CompletableFuture<Long> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Long>() {
+ @Override
+ public void completed(Long lag) {
+ future.complete(lag);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
- request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setOffloadDeleteLagAsync(namespace, lag, unit).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setOffloadDeleteLagAsync(String namespace, long lag, TimeUnit unit) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ return asyncPutRequest(path, Entity.entity(TimeUnit.MILLISECONDS.convert(lag, unit), MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void clearOffloadDeleteLag(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ clearOffloadDeleteLagAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> clearOffloadDeleteLagAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public SchemaAutoUpdateCompatibilityStrategy getSchemaAutoUpdateCompatibilityStrategy(String namespace)
throws PulsarAdminException {
try {
@@ -1009,94 +2119,232 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
public boolean getSchemaValidationEnforced(String namespace)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "schemaValidationEnforced");
- return request(path).get(Boolean.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getSchemaValidationEnforcedAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enforced) {
+ future.complete(enforced);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setSchemaValidationEnforced(String namespace, boolean schemaValidationEnforced)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "schemaValidationEnforced");
- request(path).post(Entity.entity(schemaValidationEnforced, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setSchemaValidationEnforcedAsync(namespace, schemaValidationEnforced)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setSchemaValidationEnforcedAsync(String namespace, boolean schemaValidationEnforced) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaValidationEnforced");
+ return asyncPostRequest(path, Entity.entity(schemaValidationEnforced, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
- return request(path).get(SchemaCompatibilityStrategy.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getSchemaCompatibilityStrategyAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
+ final CompletableFuture<SchemaCompatibilityStrategy> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<SchemaCompatibilityStrategy>() {
+ @Override
+ public void completed(SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
+ future.complete(schemaCompatibilityStrategy);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setSchemaCompatibilityStrategy(String namespace, SchemaCompatibilityStrategy strategy) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
- request(path).put(Entity.entity(strategy, MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setSchemaCompatibilityStrategyAsync(namespace, strategy).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
+ }
+ @Override
+ public CompletableFuture<Void> setSchemaCompatibilityStrategyAsync(String namespace, SchemaCompatibilityStrategy strategy) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
+ return asyncPutRequest(path, Entity.entity(strategy, MediaType.APPLICATION_JSON));
}
@Override
public boolean getIsAllowAutoUpdateSchema(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
- return request(path).get(Boolean.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getIsAllowAutoUpdateSchemaAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Boolean> getIsAllowAutoUpdateSchemaAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean allowAutoUpdate) {
+ future.complete(allowAutoUpdate);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchema) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
- request(path).post(Entity.entity(isAllowAutoUpdateSchema, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setIsAllowAutoUpdateSchemaAsync(namespace, isAllowAutoUpdateSchema).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setIsAllowAutoUpdateSchemaAsync(String namespace, boolean isAllowAutoUpdateSchema) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
+ return asyncPostRequest(path, Entity.entity(isAllowAutoUpdateSchema, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadPolicies");
- request(path).post(Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setOffloadPoliciesAsync(namespace, offloadPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadPolicies");
+ return asyncPostRequest(path, Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, "offloadPolicies");
- return request(path).get(OffloadPolicies.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getOffloadPoliciesAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "offloadPolicies");
+ final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<OffloadPolicies>() {
+ @Override
+ public void completed(OffloadPolicies offloadPolicies) {
+ future.complete(offloadPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());