You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 04:16:35 UTC

[pulsar] branch master updated: [PulsarAdmin] ResourceQuotas to async (#6572)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c6a58  [PulsarAdmin] ResourceQuotas to async (#6572)
63c6a58 is described below

commit 63c6a586ba744f677ec9216765896a39d00cfcf2
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 12:16:23 2020 +0800

    [PulsarAdmin] ResourceQuotas to async (#6572)
---
 .../apache/pulsar/client/admin/ResourceQuotas.java | 147 +++++++++++++++++++++
 .../client/admin/internal/ResourceQuotasImpl.java  | 132 +++++++++++++++---
 2 files changed, 258 insertions(+), 21 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
index f83ec54..98c4cb0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/ResourceQuotas.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 
+import java.util.concurrent.CompletableFuture;
+
 public interface ResourceQuotas {
 
     /**
@@ -52,6 +54,29 @@ public interface ResourceQuotas {
     ResourceQuota getDefaultResourceQuota() throws PulsarAdminException;
 
     /**
+     * Get default resource quota for new resource bundles asynchronously.
+     * <p>
+     * Get default resource quota for new resource bundles.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "msgRateIn" : 10,
+     *      "msgRateOut" : 30,
+     *      "bandwidthIn" : 10000,
+     *      "bandwidthOut" : 30000,
+     *      "memory" : 100,
+     *      "dynamic" : true
+     *  }
+     * </code>
+     * </pre>
+     *
+     */
+    CompletableFuture<ResourceQuota> getDefaultResourceQuotaAsync();
+
+    /**
      * Set default resource quota for new namespace bundles.
      * <p>
      * Set default resource quota for new namespace bundles.
@@ -94,6 +119,43 @@ public interface ResourceQuotas {
     void setDefaultResourceQuota(ResourceQuota quota) throws PulsarAdminException;
 
     /**
+     * Set default resource quota for new namespace bundles asynchronously.
+     * <p>
+     * Set default resource quota for new namespace bundles.
+     * <p>
+     * The resource quota can be set with these properties:
+     * <ul>
+     * <li><code>msgRateIn</code> : The maximum incoming messages per second.
+     * <li><code>msgRateOut</code> : The maximum outgoing messages per second.
+     * <li><code>bandwidthIn</code> : The maximum inbound bandwidth used.
+     * <li><code>bandwidthOut</code> : The maximum outbound bandwidth used.
+     * <li><code>memory</code> : The maximum memory used.
+     * <li><code>dynamic</code> : allow the quota to be dynamically re-calculated.
+     * </li>
+     * </ul>
+     *
+     * <p>
+     * Request parameter example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "msgRateIn" : 10,
+     *      "msgRateOut" : 30,
+     *      "bandwidthIn" : 10000,
+     *      "bandwidthOut" : 30000,
+     *      "memory" : 100,
+     *      "dynamic" : false
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param quota
+     *             The new ResourceQuota
+     */
+    CompletableFuture<Void> setDefaultResourceQuotaAsync(ResourceQuota quota);
+
+    /**
      * Get resource quota of a namespace bundle.
      * <p>
      * Get resource quota of a namespace bundle.
@@ -128,6 +190,34 @@ public interface ResourceQuotas {
     ResourceQuota getNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException;
 
     /**
+     * Get resource quota of a namespace bundle asynchronously.
+     * <p>
+     * Get resource quota of a namespace bundle.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "msgRateIn" : 10,
+     *      "msgRateOut" : 30,
+     *      "bandwidthIn" : 10000,
+     *      "bandwidthOut" : 30000,
+     *      "memory" : 100,
+     *      "dynamic" : true
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *             Namespace name
+     * @param bundle
+     *             Range of bundle {start}_{end}
+     *
+     */
+    CompletableFuture<ResourceQuota> getNamespaceBundleResourceQuotaAsync(String namespace, String bundle);
+
+    /**
      * Set resource quota for a namespace bundle.
      * <p>
      * Set resource quota for a namespace bundle.
@@ -177,6 +267,48 @@ public interface ResourceQuotas {
             throws PulsarAdminException;
 
     /**
+     * Set resource quota for a namespace bundle asynchronously.
+     * <p>
+     * Set resource quota for a namespace bundle.
+     * <p>
+     * The resource quota can be set with these properties:
+     * <ul>
+     * <li><code>msgRateIn</code> : The maximum incoming messages per second.
+     * <li><code>msgRateOut</code> : The maximum outgoing messages per second.
+     * <li><code>bandwidthIn</code> : The maximum inbound bandwidth used.
+     * <li><code>bandwidthOut</code> : The maximum outbound bandwidth used.
+     * <li><code>memory</code> : The maximum memory used.
+     * <li><code>dynamic</code> : allow the quota to be dynamically re-calculated.
+     * </li>
+     * </ul>
+     *
+     * <p>
+     * Request parameter example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "msgRateIn" : 10,
+     *      "msgRateOut" : 30,
+     *      "bandwidthIn" : 10000,
+     *      "bandwidthOut" : 30000,
+     *      "memory" : 100,
+     *      "dynamic" : false
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *             Namespace name
+     * @param bundle
+     *             Bundle range {start}_{end}
+     * @param quota
+     *             The new ResourceQuota
+     *
+     */
+    CompletableFuture<Void> setNamespaceBundleResourceQuotaAsync(String namespace, String bundle, ResourceQuota quota);
+
+    /**
      * Reset resource quota for a namespace bundle to default value.
      * <p>
      * Reset resource quota for a namespace bundle to default value.
@@ -196,5 +328,20 @@ public interface ResourceQuotas {
      *             Unexpected error
      */
     void resetNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException;
+
+    /**
+     * Reset resource quota for a namespace bundle to default value asynchronously.
+     * <p>
+     * Reset resource quota for a namespace bundle to default value.
+     * <p>
+     * The resource quota policy will fall back to the default.
+     *
+     * @param namespace
+     *             Namespace name
+     * @param bundle
+     *             Bundle range {start}_{end}
+     *
+     */
+    CompletableFuture<Void> resetNamespaceBundleResourceQuotaAsync(String namespace, String bundle);
 }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index 54d89d3..d41f994 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin.internal;
 
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 
@@ -29,6 +30,11 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
 
     private final WebTarget adminQuotas;
@@ -40,53 +46,137 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
         adminV2Quotas = web.path("/admin/v2/resource-quotas");
     }
 
+    @Override
     public ResourceQuota getDefaultResourceQuota() throws PulsarAdminException {
         try {
-            return request(adminV2Quotas).get(ResourceQuota.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getDefaultResourceQuotaAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<ResourceQuota> getDefaultResourceQuotaAsync() {
+        final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
+        asyncGetRequest(adminV2Quotas,
+                new InvocationCallback<ResourceQuota>() {
+                    @Override
+                    public void completed(ResourceQuota resourceQuota) {
+                        future.complete(resourceQuota);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setDefaultResourceQuota(ResourceQuota quota) throws PulsarAdminException {
         try {
-            request(adminV2Quotas).post(Entity.entity(quota, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setDefaultResourceQuotaAsync(quota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Void> setDefaultResourceQuotaAsync(ResourceQuota quota) {
+        return asyncPostRequest(adminV2Quotas, Entity.entity(quota, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public ResourceQuota getNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle);
-            return request(path).get(ResourceQuota.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getNamespaceBundleResourceQuotaAsync(namespace, bundle)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<ResourceQuota> getNamespaceBundleResourceQuotaAsync(String namespace, String bundle) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle);
+        final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<ResourceQuota>() {
+                    @Override
+                    public void completed(ResourceQuota resourceQuota) {
+                        future.complete(resourceQuota);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void setNamespaceBundleResourceQuota(String namespace, String bundle, ResourceQuota quota)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle);
-            request(path).post(Entity.entity(quota, MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            setNamespaceBundleResourceQuotaAsync(namespace, bundle, quota)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Void> setNamespaceBundleResourceQuotaAsync(String namespace, String bundle, ResourceQuota quota) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle);
+        return asyncPostRequest(path, Entity.entity(quota, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void resetNamespaceBundleResourceQuota(String namespace, String bundle) throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle);
-            request(path).delete();
-        } catch (Exception e) {
-            throw getApiException(e);
+            resetNamespaceBundleResourceQuotaAsync(namespace, bundle)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Void> resetNamespaceBundleResourceQuotaAsync(String namespace, String bundle) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle);
+        return asyncDeleteRequest(path);
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Quotas : adminQuotas;
         WebTarget namespacePath = base.path(namespace.toString());