You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 03:30:13 UTC
[pulsar] branch master updated: [PulsarAdmin] Tenants to async
(#6573)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 804ec61 [PulsarAdmin] Tenants to async (#6573)
804ec61 is described below
commit 804ec618bc31f6b42ca0fc59bc9aa295c6bb23eb
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 11:30:02 2020 +0800
[PulsarAdmin] Tenants to async (#6573)
---
.../org/apache/pulsar/client/admin/Tenants.java | 59 ++++++++++
.../pulsar/client/admin/internal/TenantsImpl.java | 119 +++++++++++++++++----
2 files changed, 160 insertions(+), 18 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
index 902dd1c..e896760 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Tenants.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -48,6 +49,19 @@ public interface Tenants {
List<String> getTenants() throws PulsarAdminException;
/**
+ * Get the list of tenants asynchronously.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["my-tenant", "other-tenant", "third-tenant"]</code>
+ * </pre>
+ *
+ * @return the list of Pulsar tenants
+ */
+ CompletableFuture<List<String>> getTenantsAsync();
+
+ /**
* Get the config of the tenant.
* <p>
* Get the admin configuration for a given tenant.
@@ -66,6 +80,17 @@ public interface Tenants {
TenantInfo getTenantInfo(String tenant) throws PulsarAdminException;
/**
+ * Get the config of the tenant asynchronously.
+ * <p>
+ * Get the admin configuration for a given tenant.
+ *
+ * @param tenant
+ * Tenant name
+ * @return the tenant configuration
+ */
+ CompletableFuture<TenantInfo> getTenantInfoAsync(String tenant);
+
+ /**
* Create a new tenant.
* <p>
* Provisions a new tenant. This operation requires Pulsar super-user privileges.
@@ -87,6 +112,18 @@ public interface Tenants {
void createTenant(String tenant, TenantInfo config) throws PulsarAdminException;
/**
+ * Create a new tenant asynchronously.
+ * <p>
+ * Provisions a new tenant. This operation requires Pulsar super-user privileges.
+ *
+ * @param tenant
+ * Tenant name
+ * @param config
+ * Config data
+ */
+ CompletableFuture<Void> createTenantAsync(String tenant, TenantInfo config);
+
+ /**
* Update the admins for a tenant.
* <p>
* This operation requires Pulsar super-user privileges.
@@ -106,6 +143,18 @@ public interface Tenants {
void updateTenant(String tenant, TenantInfo config) throws PulsarAdminException;
/**
+ * Update the admins for a tenant asynchronously.
+ * <p>
+ * This operation requires Pulsar super-user privileges.
+ *
+ * @param tenant
+ * Tenant name
+ * @param config
+ * Config data
+ */
+ CompletableFuture<Void> updateTenantAsync(String tenant, TenantInfo config);
+
+ /**
* Delete an existing tenant.
* <p>
* Delete a tenant and all namespaces and topics under it.
@@ -123,4 +172,14 @@ public interface Tenants {
* Unexpected error
*/
void deleteTenant(String tenant) throws PulsarAdminException;
+
+ /**
+ * Delete an existing tenant asynchronously.
+ * <p>
+ * Delete a tenant and all namespaces and topics under it.
+ *
+ * @param tenant
+ * Tenant name
+ */
+ CompletableFuture<Void> deleteTenantAsync(String tenant);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 6a0d1aa..077fad7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -19,8 +19,13 @@
package org.apache.pulsar.client.admin.internal;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
@@ -44,51 +49,129 @@ public class TenantsImpl extends BaseResource implements Tenants, Properties {
@Override
public List<String> getTenants() throws PulsarAdminException {
try {
- return request(adminTenants).get(new GenericType<List<String>>() {
- });
- } catch (Exception e) {
- throw getApiException(e);
+ return getTenantsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<String>> getTenantsAsync() {
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(adminTenants,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> tenants) {
+ future.complete(tenants);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public TenantInfo getTenantInfo(String tenant) throws PulsarAdminException {
try {
- return request(adminTenants.path(tenant)).get(TenantInfo.class);
- } catch (Exception e) {
- throw getApiException(e);
+ return getTenantInfoAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<TenantInfo> getTenantInfoAsync(String tenant) {
+ WebTarget path = adminTenants.path(tenant);
+ final CompletableFuture<TenantInfo> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<TenantInfo>() {
+ @Override
+ public void completed(TenantInfo tenantInfo) {
+ future.complete(tenantInfo);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void createTenant(String tenant, TenantInfo config) throws PulsarAdminException {
try {
- request(adminTenants.path(tenant)).put(Entity.entity(config, MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ createTenantAsync(tenant, config)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> createTenantAsync(String tenant, TenantInfo config) {
+ WebTarget path = adminTenants.path(tenant);
+ return asyncPutRequest(path, Entity.entity(config, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void updateTenant(String tenant, TenantInfo config) throws PulsarAdminException {
try {
- request(adminTenants.path(tenant)).post(Entity.entity(config, MediaType.APPLICATION_JSON),
- ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ updateTenantAsync(tenant, config).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> updateTenantAsync(String tenant, TenantInfo config) {
+ WebTarget path = adminTenants.path(tenant);
+ return asyncPostRequest(path, Entity.entity(config, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void deleteTenant(String tenant) throws PulsarAdminException {
try {
- request(adminTenants.path(tenant)).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteTenantAsync(tenant).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
+ @Override
+ public CompletableFuture<Void> deleteTenantAsync(String tenant) {
+ WebTarget path = adminTenants.path(tenant);
+ return asyncDeleteRequest(path);
+ }
+
// Compat method names
@Override