You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/14 11:10:40 UTC
[pulsar] branch master updated: [refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d14b3e39538 [refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)
d14b3e39538 is described below
commit d14b3e395387a0bf872b1004bf5c569a6cd52499
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Wed Sep 14 13:10:28 2022 +0200
[refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)
---
.../pulsar/client/admin/internal/BaseResource.java | 35 ++++++
.../pulsar/client/admin/internal/BrokersImpl.java | 14 +--
.../pulsar/client/admin/internal/ClustersImpl.java | 53 +-------
.../client/admin/internal/FunctionsImpl.java | 137 ++-------------------
.../pulsar/client/admin/internal/LookupImpl.java | 21 +---
.../pulsar/client/admin/internal/SchemasImpl.java | 23 +---
.../pulsar/client/admin/internal/SinksImpl.java | 94 +-------------
.../pulsar/client/admin/internal/SourcesImpl.java | 98 +--------------
.../pulsar/client/admin/internal/TopicsImpl.java | 87 +++----------
.../pulsar/client/admin/internal/WorkerImpl.java | 104 +---------------
10 files changed, 88 insertions(+), 578 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8316aa8b4e5..55c47363cd4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import java.util.function.Supplier;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;
@@ -34,6 +35,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -186,6 +188,39 @@ public abstract class BaseResource {
return callback.future();
}
+ protected <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, Class<? extends T> type) {
+ return asyncGetRequest(target, response -> response.readEntity(type));
+ }
+
+ protected <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, GenericType<T> type) {
+ return asyncGetRequest(target, response -> response.readEntity(type));
+ }
+
+ private <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, Function<Response, T> readResponse) {
+ final CompletableFuture<T> future = new CompletableFuture<>();
+ asyncGetRequest(target,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+ future.completeExceptionally(getApiException(response));
+ } else {
+ try {
+ future.complete(readResponse.apply(response));
+ } catch (Exception e) {
+ future.completeExceptionally(getApiException(e));
+ }
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
public CompletableFuture<Void> asyncDeleteRequest(final WebTarget target) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 81de4ed6e05..a9b89cd33fd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -162,19 +162,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
@Override
public CompletableFuture<Void> backlogQuotaCheckAsync() {
WebTarget path = adminBrokers.path("backlogQuotaCheck");
- final CompletableFuture<Void> future = new CompletableFuture<>();
- asyncGetRequest(path, new InvocationCallback<Void>() {
- @Override
- public void completed(Void unused) {
- future.complete(null);
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<Void>() {});
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 96744ff490a..b32e3ea684c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.Clusters;
@@ -140,22 +139,9 @@ public class ClustersImpl extends BaseResource implements Clusters {
public CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(
String cluster) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies");
- final CompletableFuture<Map<String, NamespaceIsolationData>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Map<String, NamespaceIsolationDataImpl>>() {
- @Override
- public void completed(Map<String, NamespaceIsolationDataImpl> stringNamespaceIsolationDataMap) {
- Map<String, NamespaceIsolationData> result = new HashMap<>();
- stringNamespaceIsolationDataMap.forEach(result::put);
- future.complete(result);
- }
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<Map<String, NamespaceIsolationDataImpl>>() {})
+ .thenApply(HashMap::new);
}
@Override
@@ -168,22 +154,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
public CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(
String cluster) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers");
- final CompletableFuture<List<BrokerNamespaceIsolationData>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<List<BrokerNamespaceIsolationDataImpl>>() {
- @Override
- public void completed(List<BrokerNamespaceIsolationDataImpl> brokerNamespaceIsolationData) {
- List<BrokerNamespaceIsolationData> data =
- new ArrayList<>(brokerNamespaceIsolationData);
- future.complete(data);
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<List<BrokerNamespaceIsolationDataImpl>>() {})
+ .thenApply(ArrayList::new);
}
@Override
@@ -303,21 +275,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
@Override
public CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster) {
WebTarget path = adminClusters.path(cluster).path("failureDomains");
- final CompletableFuture<Map<String, FailureDomain>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Map<String, FailureDomainImpl>>() {
- @Override
- public void completed(Map<String, FailureDomainImpl> failureDomains) {
- Map<String, FailureDomain> result = new HashMap<>(failureDomains);
- future.complete(result);
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<Map<String, FailureDomainImpl>>() {})
+ .thenApply(HashMap::new);
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 482846ad8ac..f358764c070 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -87,25 +87,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace) {
WebTarget path = functions.path(tenant).path(namespace);
- final CompletableFuture<List<String>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- List<String> functions = response.readEntity(new GenericType<List<String>>() {});
- future.complete(functions);
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<String>>() {});
}
@Override
@@ -116,24 +98,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function) {
WebTarget path = functions.path(tenant).path(namespace).path(function);
- final CompletableFuture<FunctionConfig> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(FunctionConfig.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionConfig.class);
}
@Override
@@ -145,24 +110,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
@Override
public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function) {
WebTarget path = functions.path(tenant).path(namespace).path(function).path("status");
- final CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(FunctionStatus.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionStatus.class);
}
@Override
@@ -176,26 +124,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
String tenant, String namespace, String function, int id) {
WebTarget path =
functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status");
- final CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> future =
- new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(
- FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class);
}
@Override
@@ -208,25 +137,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
public CompletableFuture<FunctionInstanceStatsData> getFunctionStatsAsync(
String tenant, String namespace, String function, int id) {
WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats");
- final CompletableFuture<FunctionInstanceStatsData> future =
- new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionInstanceStatsDataImpl.class);
}
@Override
@@ -239,24 +150,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant,
String namespace, String function) {
WebTarget path = functions.path(tenant).path(namespace).path(function).path("stats");
- final CompletableFuture<FunctionStats> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(FunctionStatsImpl.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionStatsImpl.class);
}
@Override
@@ -788,24 +682,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
public CompletableFuture<FunctionState> getFunctionStateAsync(
String tenant, String namespace, String function, String key) {
WebTarget path = functions.path(tenant).path(namespace).path(function).path("state").path(key);
- final CompletableFuture<FunctionState> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(FunctionState.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, FunctionState.class);
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index d5a6471c794..fc5f7528cef 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -57,24 +56,8 @@ public class LookupImpl extends BaseResource implements Lookup {
String prefix = topicName.isV2() ? "/topic" : "/destination";
WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName());
- final CompletableFuture<String> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<LookupData>() {
- @Override
- public void completed(LookupData lookupData) {
- if (useTls) {
- future.complete(lookupData.getBrokerUrlTls());
- } else {
- future.complete(lookupData.getBrokerUrl());
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<LookupData>() {})
+ .thenApply(lookupData -> useTls ? lookupData.getBrokerUrlTls() : lookupData.getBrokerUrl());
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 65417ec9e72..4920cadf608 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -269,24 +269,11 @@ public class SchemasImpl extends BaseResource implements Schemas {
public CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic) {
WebTarget path = schemasPath(TopicName.get(topic));
TopicName topicName = TopicName.get(topic);
- final CompletableFuture<List<SchemaInfo>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<GetAllVersionsSchemaResponse>() {
- @Override
- public void completed(GetAllVersionsSchemaResponse response) {
- future.complete(
- response.getGetSchemaResponses().stream()
- .map(getSchemaResponse ->
- convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
- .collect(Collectors.toList()));
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new FutureCallback<GetAllVersionsSchemaResponse>() {})
+ .thenApply(response -> response.getGetSchemaResponses().stream()
+ .map(getSchemaResponse ->
+ convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
+ .collect(Collectors.toList()));
}
private WebTarget schemaPath(TopicName topicName) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index f27071eb72b..9e110b018e2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -25,7 +25,6 @@ import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
@@ -73,23 +72,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
return future;
}
WebTarget path = sink.path(tenant).path(namespace);
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(new GenericType<List<String>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<String>>() {});
}
@Override
@@ -104,23 +87,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName);
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(SinkConfig.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SinkConfig.class);
}
@Override
@@ -136,23 +103,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("status");
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(SinkStatus.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SinkStatus.class);
}
@Override
@@ -170,24 +121,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
return future;
}
WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status");
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(
- SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class);
}
@Override
@@ -484,25 +418,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
@Override
public CompletableFuture<List<ConnectorDefinition>> getBuiltInSinksAsync() {
WebTarget path = sink.path("builtinsinks");
- final CompletableFuture<List<ConnectorDefinition>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(
- new GenericType<List<ConnectorDefinition>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<ConnectorDefinition>>() {});
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 4996e8b6e94..ac1243f0b69 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -25,7 +25,6 @@ import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
@@ -68,24 +67,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
@Override
public CompletableFuture<List<String>> listSourcesAsync(String tenant, String namespace) {
WebTarget path = source.path(tenant).path(namespace);
- final CompletableFuture<List<String>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(new GenericType<List<String>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<String>>() {});
}
@Override
@@ -96,24 +78,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
@Override
public CompletableFuture<SourceConfig> getSourceAsync(String tenant, String namespace, String sourceName) {
WebTarget path = source.path(tenant).path(namespace).path(sourceName);
- final CompletableFuture<SourceConfig> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(SourceConfig.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SourceConfig.class);
}
@Override
@@ -125,24 +90,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
@Override
public CompletableFuture<SourceStatus> getSourceStatusAsync(String tenant, String namespace, String sourceName) {
WebTarget path = source.path(tenant).path(namespace).path(sourceName).path("status");
- final CompletableFuture<SourceStatus> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(SourceStatus.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SourceStatus.class);
}
@Override
@@ -155,26 +103,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
public CompletableFuture<SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> getSourceStatusAsync(
String tenant, String namespace, String sourceName, int id) {
WebTarget path = source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status");
- final CompletableFuture<SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> future =
- new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(
- SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class);
}
@Override
@@ -432,24 +361,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
public CompletableFuture<List<ConnectorDefinition>> getBuiltInSourcesAsync() {
WebTarget path = source.path("builtinsources");
final CompletableFuture<List<ConnectorDefinition>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(getApiException(response));
- } else {
- future.complete(response.readEntity(
- new GenericType<List<ConnectorDefinition>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<ConnectorDefinition>>() {});
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 87be258dbec..e3b51accdfd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -195,40 +195,18 @@ public class TopicsImpl extends BaseResource implements Topics {
nonPersistentPath = nonPersistentPath
.queryParam("bundle", options.getBundle())
.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
- final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
- final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
+ final CompletableFuture<List<String>> persistentList;
+ final CompletableFuture<List<String>> nonPersistentList;
if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
- asyncGetRequest(persistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> topics) {
- persistentList.complete(topics);
- }
-
- @Override
- public void failed(Throwable throwable) {
- persistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
+ persistentList = asyncGetRequest(persistentPath, new FutureCallback<List<String>>() {});
} else {
- persistentList.complete(Collections.emptyList());
+ persistentList = CompletableFuture.completedFuture(Collections.emptyList());
}
if (topicDomain == null || TopicDomain.non_persistent.equals(topicDomain)) {
- asyncGetRequest(nonPersistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> a) {
- nonPersistentList.complete(a);
- }
-
- @Override
- public void failed(Throwable throwable) {
- nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
+ nonPersistentList = asyncGetRequest(nonPersistentPath, new FutureCallback<List<String>>() {});
} else {
- nonPersistentList.complete(Collections.emptyList());
+ nonPersistentList = CompletableFuture.completedFuture(Collections.emptyList());
}
return persistentList.thenCombine(nonPersistentList,
@@ -258,32 +236,10 @@ public class TopicsImpl extends BaseResource implements Topics {
WebTarget nonPersistentPath = namespacePath("non-persistent", ns, "partitioned");
persistentPath = persistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
nonPersistentPath = nonPersistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
- final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
- final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
- asyncGetRequest(persistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> topics) {
- persistentList.complete(topics);
- }
-
- @Override
- public void failed(Throwable throwable) {
- persistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- asyncGetRequest(nonPersistentPath,
- new InvocationCallback<List<String>>() {
- @Override
- public void completed(List<String> topics) {
- nonPersistentList.complete(topics);
- }
-
- @Override
- public void failed(Throwable throwable) {
- nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
+ final CompletableFuture<List<String>> persistentList =
+ asyncGetRequest(persistentPath, new FutureCallback<List<String>>() {});
+ final CompletableFuture<List<String>> nonPersistentList =
+ asyncGetRequest(nonPersistentPath, new FutureCallback<List<String>>() {});
return persistentList.thenCombine(nonPersistentList,
(l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
@@ -1491,25 +1447,14 @@ public class TopicsImpl extends BaseResource implements Topics {
public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "lastMessageId");
- final CompletableFuture<MessageId> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<BatchMessageIdImpl>() {
-
- @Override
- public void completed(BatchMessageIdImpl response) {
- if (response.getBatchIndex() == -1) {
- future.complete(new MessageIdImpl(response.getLedgerId(),
- response.getEntryId(), response.getPartitionIndex()));
- }
- future.complete(response);
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
+ return asyncGetRequest(path, new FutureCallback<BatchMessageIdImpl>() {})
+ .thenApply(response -> {
+ if (response.getBatchIndex() == -1) {
+ return new MessageIdImpl(response.getLedgerId(),
+ response.getEntryId(), response.getPartitionIndex());
}
+ return response;
});
- return future;
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 551ffa29b4d..2e57ee9710a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -22,13 +22,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Worker;
@@ -57,26 +54,7 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public CompletableFuture<List<WorkerFunctionInstanceStats>> getFunctionsStatsAsync() {
WebTarget path = workerStats.path("functionsmetrics");
- final CompletableFuture<List<WorkerFunctionInstanceStats>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new ClientErrorException(response));
- } else {
- List<WorkerFunctionInstanceStats> metricsList =
- response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
- future.complete(metricsList);
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<WorkerFunctionInstanceStats>>() {});
}
@Override
@@ -87,25 +65,8 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public CompletableFuture<Collection<Metrics>> getMetricsAsync() {
WebTarget path = workerStats.path("metrics");
- final CompletableFuture<Collection<Metrics>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new ClientErrorException(response));
- } else {
- future.complete(response.readEntity(
- new GenericType<List<Metrics>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<Metrics>>() {})
+ .thenApply(list -> list);
}
@Override
@@ -116,25 +77,7 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public CompletableFuture<List<WorkerInfo>> getClusterAsync() {
WebTarget path = worker.path("cluster");
- final CompletableFuture<List<WorkerInfo>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
-
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new ClientErrorException(response));
- } else {
- future.complete(response.readEntity(new GenericType<List<WorkerInfo>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<List<WorkerInfo>>() {});
}
@Override
@@ -145,24 +88,7 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public CompletableFuture<WorkerInfo> getClusterLeaderAsync() {
WebTarget path = worker.path("cluster").path("leader");
- final CompletableFuture<WorkerInfo> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new ClientErrorException(response));
- } else {
- future.complete(response.readEntity(new GenericType<WorkerInfo>(){}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<WorkerInfo>(){});
}
@Override
@@ -173,25 +99,7 @@ public class WorkerImpl extends BaseResource implements Worker {
@Override
public CompletableFuture<Map<String, Collection<String>>> getAssignmentsAsync() {
WebTarget path = worker.path("assignments");
- final CompletableFuture<Map<String, Collection<String>>> future = new CompletableFuture<>();
- asyncGetRequest(path,
- new InvocationCallback<Response>() {
- @Override
- public void completed(Response response) {
- if (response.getStatus() != Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new ClientErrorException(response));
- } else {
- future.complete(response.readEntity(
- new GenericType<Map<String, Collection<String>>>() {}));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(getApiException(throwable.getCause()));
- }
- });
- return future;
+ return asyncGetRequest(path, new GenericType<Map<String, Collection<String>>>() {});
}
@Override