You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/21 03:29:07 UTC
[pulsar] branch master updated: [PulsarAdmin] Worker to async
(#6570)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac5a957 [PulsarAdmin] Worker to async (#6570)
ac5a957 is described below
commit ac5a957830ed4aed1e20b81ea27db680d41e9cd2
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sat Mar 21 11:28:57 2020 +0800
[PulsarAdmin] Worker to async (#6570)
---
.../org/apache/pulsar/client/admin/Worker.java | 43 ++++-
.../pulsar/client/admin/internal/WorkerImpl.java | 209 +++++++++++++++++----
2 files changed, 205 insertions(+), 47 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index ddeb1df..59adce0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -21,48 +21,79 @@ package org.apache.pulsar.client.admin;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
+import org.apache.pulsar.common.stats.Metrics;
/**
* Admin interface for worker stats management.
*/
public interface Worker {
-
/**
- * Get all functions stats on a worker
+ * Get all functions stats on a worker.
* @return
* @throws PulsarAdminException
*/
List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException;
/**
+ * Get all functions stats on a worker asynchronously.
+ * @return
+ */
+ CompletableFuture<List<WorkerFunctionInstanceStats>> getFunctionsStatsAsync();
+
+ /**
* Get worker metrics.
* @return
* @throws PulsarAdminException
*/
- Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException;
+ Collection<Metrics> getMetrics() throws PulsarAdminException;
/**
- * Get List of all workers belonging to this cluster
+ * Get worker metrics asynchronously.
+ * @return
+ */
+ CompletableFuture<Collection<Metrics>> getMetricsAsync();
+
+ /**
+ * Get List of all workers belonging to this cluster.
* @return
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;
/**
- * Get the worker who is the leader of the cluster
+ * Get List of all workers belonging to this cluster asynchronously.
+ * @return
+ */
+ CompletableFuture<List<WorkerInfo>> getClusterAsync();
+
+ /**
+ * Get the worker who is the leader of the cluster.
* @return
* @throws PulsarAdminException
*/
WorkerInfo getClusterLeader() throws PulsarAdminException;
/**
- * Get the function assignment among the cluster
+ * Get the worker who is the leader of the cluster asynchronously.
+ * @return
+ */
+ CompletableFuture<WorkerInfo> getClusterLeaderAsync();
+
+ /**
+ * Get the function assignment among the cluster.
* @return
* @throws PulsarAdminException
*/
Map<String, Collection<String>> getAssignments() throws PulsarAdminException;
+
+ /**
+ * Get the function assignment among the cluster asynchronously.
+ * @return
+ */
+ CompletableFuture<Map<String, Collection<String>>> getAssignmentsAsync();
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 3dbd66a..9619eb1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -24,14 +24,20 @@ import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
+import org.apache.pulsar.common.stats.Metrics;
import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
@Slf4j
public class WorkerImpl extends BaseResource implements Worker {
@@ -48,69 +54,190 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException {
try {
- Response response = request(workerStats.path("functionsmetrics")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- List<WorkerFunctionInstanceStats> metricsList
- = response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
- return metricsList;
- } catch (Exception e) {
- throw getApiException(e);
+ return getFunctionsStatsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
- public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException {
+ public CompletableFuture<List<WorkerFunctionInstanceStats>> getFunctionsStatsAsync() {
+ WebTarget path = workerStats.path("functionsmetrics");
+ final CompletableFuture<List<WorkerFunctionInstanceStats>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(new ClientErrorException(response));
+ } else {
+ List<WorkerFunctionInstanceStats> metricsList
+ = response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
+ future.complete(metricsList);
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public Collection<Metrics> getMetrics() throws PulsarAdminException {
try {
- Response response = request(workerStats.path("metrics")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- return response.readEntity(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getMetricsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Collection<Metrics>> getMetricsAsync() {
+ WebTarget path = workerStats.path("metrics");
+ final CompletableFuture<Collection<Metrics>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(new ClientErrorException(response));
+ } else {
+ future.complete(response.readEntity(
+ new GenericType<List<Metrics>>() {}));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
- Response response = request(worker.path("cluster")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- return response.readEntity(new GenericType<List<WorkerInfo>>() {});
- } catch (Exception e) {
- throw getApiException(e);
+ return getClusterAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<List<WorkerInfo>> getClusterAsync() {
+ WebTarget path = worker.path("cluster");
+ final CompletableFuture<List<WorkerInfo>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(new ClientErrorException(response));
+ } else {
+ future.complete(response.readEntity(new GenericType<List<WorkerInfo>>() {}));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public WorkerInfo getClusterLeader() throws PulsarAdminException {
try {
- Response response = request(worker.path("cluster").path("leader")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- return response.readEntity(new GenericType<WorkerInfo>(){});
- } catch (Exception e) {
- throw getApiException(e);
+ return getClusterLeaderAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<WorkerInfo> getClusterLeaderAsync() {
+ WebTarget path = worker.path("cluster").path("leader");
+ final CompletableFuture<WorkerInfo> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(new ClientErrorException(response));
+ } else {
+ future.complete(response.readEntity(new GenericType<WorkerInfo>(){}));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public Map<String, Collection<String>> getAssignments() throws PulsarAdminException {
try {
- Response response = request(worker.path("assignments")).get();
- if (!response.getStatusInfo().equals(Response.Status.OK)) {
- throw new ClientErrorException(response);
- }
- Map<String, Collection<String>> assignments
- = response.readEntity(new GenericType<Map<String, Collection<String>>>() {});
- return assignments;
- } catch (Exception e) {
- throw getApiException(e);
+ return getAssignmentsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
-}
\ No newline at end of file
+
+ @Override
+ public CompletableFuture<Map<String, Collection<String>>> getAssignmentsAsync() {
+ WebTarget path = worker.path("assignments");
+ final CompletableFuture<Map<String, Collection<String>>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ future.completeExceptionally(new ClientErrorException(response));
+ } else {
+ future.complete(response.readEntity(
+ new GenericType<Map<String, Collection<String>>>() {}));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+}