You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 04:16:35 UTC
[pulsar] branch master updated: [PulsarAdmin] ResourceQuotas to
async (#6572)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 63c6a58 [PulsarAdmin] ResourceQuotas to async (#6572)
63c6a58 is described below
commit 63c6a586ba744f677ec9216765896a39d00cfcf2
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 12:16:23 2020 +0800
[PulsarAdmin] ResourceQuotas to async (#6572)
---
.../apache/pulsar/client/admin/ResourceQuotas.java | 147 +++++++++++++++++++++
.../client/admin/internal/ResourceQuotasImpl.java | 132 +++++++++++++++---
2 files changed, 258 insertions(+), 21 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
index f83ec54..98c4cb0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.common.policies.data.ResourceQuota;
+import java.util.concurrent.CompletableFuture;
+
public interface ResourceQuotas {
/**
@@ -52,6 +54,29 @@ public interface ResourceQuotas {
ResourceQuota getDefaultResourceQuota() throws PulsarAdminException;
/**
+ * Get default resource quota for new resource bundles asynchronously.
+ * <p>
+ * Get default resource quota for new resource bundles.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "msgRateIn" : 10,
+ * "msgRateOut" : 30,
+ * "bandwidthIn" : 10000,
+ * "bandwidthOut" : 30000,
+ * "memory" : 100,
+ * "dynamic" : true
+ * }
+ * </code>
+ * </pre>
+ *
+ */
+ CompletableFuture<ResourceQuota> getDefaultResourceQuotaAsync();
+
+ /**
* Set default resource quota for new namespace bundles.
* <p>
* Set default resource quota for new namespace bundles.
@@ -94,6 +119,43 @@ public interface ResourceQuotas {
void setDefaultResourceQuota(ResourceQuota quota) throws PulsarAdminException;
/**
+ * Set default resource quota for new namespace bundles asynchronously.
+ * <p>
+ * Set default resource quota for new namespace bundles.
+ * <p>
+ * The resource quota can be set with these properties:
+ * <ul>
+ * <li><code>msgRateIn</code> : The maximum incoming messages per second.
+ * <li><code>msgRateOut</code> : The maximum outgoing messages per second.
+ * <li><code>bandwidthIn</code> : The maximum inbound bandwidth used.
+ * <li><code>bandwidthOut</code> : The maximum outbound bandwidth used.
+ * <li><code>memory</code> : The maximum memory used.
+ * <li><code>dynamic</code> : allow the quota to be dynamically re-calculated.
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * Request parameter example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "msgRateIn" : 10,
+ * "msgRateOut" : 30,
+ * "bandwidthIn" : 10000,
+ * "bandwidthOut" : 30000,
+ * "memory" : 100,
+ * "dynamic" : false
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param quota
+ * The new ResourceQuota
+ */
+ CompletableFuture<Void> setDefaultResourceQuotaAsync(ResourceQuota quota);
+
+ /**
* Get resource quota of a namespace bundle.
* <p>
* Get resource quota of a namespace bundle.
@@ -128,6 +190,34 @@ public interface ResourceQuotas {
ResourceQuota getNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException;
/**
+ * Get resource quota of a namespace bundle asynchronously.
+ * <p>
+ * Get resource quota of a namespace bundle.
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "msgRateIn" : 10,
+ * "msgRateOut" : 30,
+ * "bandwidthIn" : 10000,
+ * "bandwidthOut" : 30000,
+ * "memory" : 100,
+ * "dynamic" : true
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundle
+ * Range of bundle {start}_{end}
+ *
+ */
+ CompletableFuture<ResourceQuota> getNamespaceBundleResourceQuotaAsync(String namespace, String bundle);
+
+ /**
* Set resource quota for a namespace bundle.
* <p>
* Set resource quota for a namespace bundle.
@@ -177,6 +267,48 @@ public interface ResourceQuotas {
throws PulsarAdminException;
/**
+ * Set resource quota for a namespace bundle asynchronously.
+ * <p>
+ * Set resource quota for a namespace bundle.
+ * <p>
+ * The resource quota can be set with these properties:
+ * <ul>
+ * <li><code>msgRateIn</code> : The maximum incoming messages per second.
+ * <li><code>msgRateOut</code> : The maximum outgoing messages per second.
+ * <li><code>bandwidthIn</code> : The maximum inbound bandwidth used.
+ * <li><code>bandwidthOut</code> : The maximum outbound bandwidth used.
+ * <li><code>memory</code> : The maximum memory used.
+ * <li><code>dynamic</code> : allow the quota to be dynamically re-calculated.
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * Request parameter example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "msgRateIn" : 10,
+ * "msgRateOut" : 30,
+ * "bandwidthIn" : 10000,
+ * "bandwidthOut" : 30000,
+ * "memory" : 100,
+ * "dynamic" : false
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundle
+ * Bundle range {start}_{end}
+ * @param quota
+ * The new ResourceQuota
+ *
+ */
+ CompletableFuture<Void> setNamespaceBundleResourceQuotaAsync(String namespace, String bundle, ResourceQuota quota);
+
+ /**
* Reset resource quota for a namespace bundle to default value.
* <p>
* Reset resource quota for a namespace bundle to default value.
@@ -196,5 +328,20 @@ public interface ResourceQuotas {
* Unexpected error
*/
void resetNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException;
+
+ /**
+ * Reset resource quota for a namespace bundle to default value asynchronously.
+ * <p>
+ * Reset resource quota for a namespace bundle to default value.
+ * <p>
+ * The resource quota policy will fall back to the default.
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundle
+ * Bundle range {start}_{end}
+ *
+ */
+ CompletableFuture<Void> resetNamespaceBundleResourceQuotaAsync(String namespace, String bundle);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index 54d89d3..d41f994 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
@@ -29,6 +30,11 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.ResourceQuota;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
private final WebTarget adminQuotas;
@@ -40,53 +46,137 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
adminV2Quotas = web.path("/admin/v2/resource-quotas");
}
+ @Override
public ResourceQuota getDefaultResourceQuota() throws PulsarAdminException {
try {
- return request(adminV2Quotas).get(ResourceQuota.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getDefaultResourceQuotaAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<ResourceQuota> getDefaultResourceQuotaAsync() {
+ final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
+ asyncGetRequest(adminV2Quotas,
+ new InvocationCallback<ResourceQuota>() {
+ @Override
+ public void completed(ResourceQuota resourceQuota) {
+ future.complete(resourceQuota);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setDefaultResourceQuota(ResourceQuota quota) throws PulsarAdminException {
try {
- request(adminV2Quotas).post(Entity.entity(quota, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setDefaultResourceQuotaAsync(quota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Void> setDefaultResourceQuotaAsync(ResourceQuota quota) {
+ return asyncPostRequest(adminV2Quotas, Entity.entity(quota, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public ResourceQuota getNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle);
- return request(path).get(ResourceQuota.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getNamespaceBundleResourceQuotaAsync(namespace, bundle)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<ResourceQuota> getNamespaceBundleResourceQuotaAsync(String namespace, String bundle) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle);
+ final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<ResourceQuota>() {
+ @Override
+ public void completed(ResourceQuota resourceQuota) {
+ future.complete(resourceQuota);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setNamespaceBundleResourceQuota(String namespace, String bundle, ResourceQuota quota)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle);
- request(path).post(Entity.entity(quota, MediaType.APPLICATION_JSON), ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ setNamespaceBundleResourceQuotaAsync(namespace, bundle, quota)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Void> setNamespaceBundleResourceQuotaAsync(String namespace, String bundle, ResourceQuota quota) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle);
+ return asyncPostRequest(path, Entity.entity(quota, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void resetNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle);
- request(path).delete();
- } catch (Exception e) {
- throw getApiException(e);
+ resetNamespaceBundleResourceQuotaAsync(namespace, bundle)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Void> resetNamespaceBundleResourceQuotaAsync(String namespace, String bundle) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle);
+ return asyncDeleteRequest(path);
+ }
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Quotas : adminQuotas;
WebTarget namespacePath = base.path(namespace.toString());