You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 03:30:13 UTC

[pulsar] branch master updated: [PulsarAdmin] Tenants to async (#6573)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 804ec61  [PulsarAdmin] Tenants to async (#6573)
804ec61 is described below

commit 804ec618bc31f6b42ca0fc59bc9aa295c6bb23eb
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 11:30:02 2020 +0800

    [PulsarAdmin] Tenants to async (#6573)
---
 .../org/apache/pulsar/client/admin/Tenants.java    |  59 ++++++++++
 .../pulsar/client/admin/internal/TenantsImpl.java  | 119 +++++++++++++++++----
 2 files changed, 160 insertions(+), 18 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
index 902dd1c..e896760 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -48,6 +49,19 @@ public interface Tenants {
     List<String> getTenants() throws PulsarAdminException;
 
     /**
+     * Get the list of tenants asynchronously.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["my-tenant", "other-tenant", "third-tenant"]</code>
+     * </pre>
+     *
+     * @return the list of Pulsar tenants
+     */
+    CompletableFuture<List<String>> getTenantsAsync();
+
+    /**
      * Get the config of the tenant.
      * <p>
      * Get the admin configuration for a given tenant.
@@ -66,6 +80,17 @@ public interface Tenants {
     TenantInfo getTenantInfo(String tenant) throws PulsarAdminException;
 
     /**
+     * Get the config of the tenant asynchronously.
+     * <p>
+     * Get the admin configuration for a given tenant.
+     *
+     * @param tenant
+     *            Tenant name
+     * @return the tenant configuration
+     */
+    CompletableFuture<TenantInfo> getTenantInfoAsync(String tenant);
+
+    /**
      * Create a new tenant.
      * <p>
      * Provisions a new tenant. This operation requires Pulsar super-user privileges.
@@ -87,6 +112,18 @@ public interface Tenants {
     void createTenant(String tenant, TenantInfo config) throws PulsarAdminException;
 
     /**
+     * Create a new tenant asynchronously.
+     * <p>
+     * Provisions a new tenant. This operation requires Pulsar super-user privileges.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param config
+     *            Config data
+     */
+    CompletableFuture<Void> createTenantAsync(String tenant, TenantInfo config);
+
+    /**
      * Update the admins for a tenant.
      * <p>
      * This operation requires Pulsar super-user privileges.
@@ -106,6 +143,18 @@ public interface Tenants {
     void updateTenant(String tenant, TenantInfo config) throws PulsarAdminException;
 
     /**
+     * Update the admins for a tenant asynchronously.
+     * <p>
+     * This operation requires Pulsar super-user privileges.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param config
+     *            Config data
+     */
+    CompletableFuture<Void> updateTenantAsync(String tenant, TenantInfo config);
+
+    /**
      * Delete an existing tenant.
      * <p>
      * Delete a tenant and all namespaces and topics under it.
@@ -123,4 +172,14 @@ public interface Tenants {
      *             Unexpected error
      */
     void deleteTenant(String tenant) throws PulsarAdminException;
+
+    /**
+     * Delete an existing tenant asynchronously.
+     * <p>
+     * Delete a tenant and all namespaces and topics under it.
+     *
+     * @param tenant
+     *            Tenant name
+     */
+    CompletableFuture<Void> deleteTenantAsync(String tenant);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 6a0d1aa..077fad7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -19,8 +19,13 @@
 package org.apache.pulsar.client.admin.internal;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -44,51 +49,129 @@ public class TenantsImpl extends BaseResource implements Tenants, Properties {
     @Override
     public List<String> getTenants() throws PulsarAdminException {
         try {
-            return request(adminTenants).get(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getTenantsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getTenantsAsync() {
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(adminTenants,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> tenants) {
+                        future.complete(tenants);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public TenantInfo getTenantInfo(String tenant) throws PulsarAdminException {
         try {
-            return request(adminTenants.path(tenant)).get(TenantInfo.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getTenantInfoAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<TenantInfo> getTenantInfoAsync(String tenant) {
+        WebTarget path = adminTenants.path(tenant);
+        final CompletableFuture<TenantInfo> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<TenantInfo>() {
+                    @Override
+                    public void completed(TenantInfo tenantInfo) {
+                        future.complete(tenantInfo);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void createTenant(String tenant, TenantInfo config) throws PulsarAdminException {
         try {
-            request(adminTenants.path(tenant)).put(Entity.entity(config, MediaType.APPLICATION_JSON),
-                    ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            createTenantAsync(tenant, config)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> createTenantAsync(String tenant, TenantInfo config) {
+        WebTarget path = adminTenants.path(tenant);
+        return asyncPutRequest(path, Entity.entity(config, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void updateTenant(String tenant, TenantInfo config) throws PulsarAdminException {
         try {
-            request(adminTenants.path(tenant)).post(Entity.entity(config, MediaType.APPLICATION_JSON),
-                    ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            updateTenantAsync(tenant, config).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> updateTenantAsync(String tenant, TenantInfo config) {
+        WebTarget path = adminTenants.path(tenant);
+        return asyncPostRequest(path, Entity.entity(config, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void deleteTenant(String tenant) throws PulsarAdminException {
         try {
-            request(adminTenants.path(tenant)).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteTenantAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<Void> deleteTenantAsync(String tenant) {
+        WebTarget path = adminTenants.path(tenant);
+        return asyncDeleteRequest(path);
+    }
+
     // Compat method names
 
     @Override