You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/14 11:10:40 UTC

[pulsar] branch master updated: [refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d14b3e39538 [refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)
d14b3e39538 is described below

commit d14b3e395387a0bf872b1004bf5c569a6cd52499
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Wed Sep 14 13:10:28 2022 +0200

    [refactor][java-client] Reduce code duplication in GET requests in admin client (#17023)
---
 .../pulsar/client/admin/internal/BaseResource.java |  35 ++++++
 .../pulsar/client/admin/internal/BrokersImpl.java  |  14 +--
 .../pulsar/client/admin/internal/ClustersImpl.java |  53 +-------
 .../client/admin/internal/FunctionsImpl.java       | 137 ++-------------------
 .../pulsar/client/admin/internal/LookupImpl.java   |  21 +---
 .../pulsar/client/admin/internal/SchemasImpl.java  |  23 +---
 .../pulsar/client/admin/internal/SinksImpl.java    |  94 +-------------
 .../pulsar/client/admin/internal/SourcesImpl.java  |  98 +--------------
 .../pulsar/client/admin/internal/TopicsImpl.java   |  87 +++----------
 .../pulsar/client/admin/internal/WorkerImpl.java   | 104 +---------------
 10 files changed, 88 insertions(+), 578 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8316aa8b4e5..55c47363cd4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.ServerErrorException;
@@ -34,6 +35,7 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.Invocation.Builder;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -186,6 +188,39 @@ public abstract class BaseResource {
         return callback.future();
     }
 
+    protected <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, Class<? extends T> type) {
+        return asyncGetRequest(target, response -> response.readEntity(type));
+    }
+
+    protected <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, GenericType<T> type) {
+        return asyncGetRequest(target, response -> response.readEntity(type));
+    }
+
+    private <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, Function<Response, T> readResponse) {
+        final CompletableFuture<T> future = new CompletableFuture<>();
+        asyncGetRequest(target,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            try {
+                                future.complete(readResponse.apply(response));
+                            } catch (Exception e) {
+                                future.completeExceptionally(getApiException(e));
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     public CompletableFuture<Void> asyncDeleteRequest(final WebTarget target) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 81de4ed6e05..a9b89cd33fd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -162,19 +162,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<Void> backlogQuotaCheckAsync() {
         WebTarget path = adminBrokers.path("backlogQuotaCheck");
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Void>() {
-            @Override
-            public void completed(Void unused) {
-                future.complete(null);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(throwable);
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Void>() {});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 96744ff490a..b32e3ea684c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.Clusters;
@@ -140,22 +139,9 @@ public class ClustersImpl extends BaseResource implements Clusters {
     public CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(
             String cluster) {
         WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies");
-        final CompletableFuture<Map<String, NamespaceIsolationData>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, NamespaceIsolationDataImpl>>() {
-                    @Override
-                    public void completed(Map<String, NamespaceIsolationDataImpl> stringNamespaceIsolationDataMap) {
-                        Map<String, NamespaceIsolationData> result = new HashMap<>();
-                        stringNamespaceIsolationDataMap.forEach(result::put);
-                        future.complete(result);
-                    }
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, NamespaceIsolationDataImpl>>() {})
+                .thenApply(HashMap::new);
     }
 
     @Override
@@ -168,22 +154,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     public CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(
             String cluster) {
         WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers");
-        final CompletableFuture<List<BrokerNamespaceIsolationData>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<BrokerNamespaceIsolationDataImpl>>() {
-                    @Override
-                    public void completed(List<BrokerNamespaceIsolationDataImpl> brokerNamespaceIsolationData) {
-                        List<BrokerNamespaceIsolationData> data =
-                                new ArrayList<>(brokerNamespaceIsolationData);
-                        future.complete(data);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<BrokerNamespaceIsolationDataImpl>>() {})
+                .thenApply(ArrayList::new);
     }
 
     @Override
@@ -303,21 +275,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     @Override
     public CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster) {
         WebTarget path = adminClusters.path(cluster).path("failureDomains");
-        final CompletableFuture<Map<String, FailureDomain>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, FailureDomainImpl>>() {
-                    @Override
-                    public void completed(Map<String, FailureDomainImpl> failureDomains) {
-                        Map<String, FailureDomain> result = new HashMap<>(failureDomains);
-                        future.complete(result);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, FailureDomainImpl>>() {})
+                .thenApply(HashMap::new);
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 482846ad8ac..f358764c070 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -87,25 +87,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     @Override
     public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace) {
         WebTarget path = functions.path(tenant).path(namespace);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            List<String> functions = response.readEntity(new GenericType<List<String>>() {});
-                            future.complete(functions);
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<String>>() {});
     }
 
     @Override
@@ -116,24 +98,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     @Override
     public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function) {
         WebTarget path = functions.path(tenant).path(namespace).path(function);
-        final CompletableFuture<FunctionConfig> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(FunctionConfig.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionConfig.class);
     }
 
     @Override
@@ -145,24 +110,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     @Override
     public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function) {
         WebTarget path = functions.path(tenant).path(namespace).path(function).path("status");
-        final CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(FunctionStatus.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionStatus.class);
     }
 
     @Override
@@ -176,26 +124,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
             String tenant, String namespace, String function, int id) {
         WebTarget path =
                 functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status");
-        final CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> future =
-                new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class);
     }
 
     @Override
@@ -208,25 +137,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     public CompletableFuture<FunctionInstanceStatsData> getFunctionStatsAsync(
             String tenant, String namespace, String function, int id) {
         WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats");
-        final CompletableFuture<FunctionInstanceStatsData> future =
-                new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionInstanceStatsDataImpl.class);
     }
 
     @Override
@@ -239,24 +150,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant,
                                                                   String namespace, String function) {
         WebTarget path = functions.path(tenant).path(namespace).path(function).path("stats");
-        final CompletableFuture<FunctionStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(FunctionStatsImpl.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionStatsImpl.class);
     }
 
     @Override
@@ -788,24 +682,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     public CompletableFuture<FunctionState> getFunctionStateAsync(
             String tenant, String namespace, String function, String key) {
         WebTarget path = functions.path(tenant).path(namespace).path(function).path("state").path(key);
-        final CompletableFuture<FunctionState> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(FunctionState.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, FunctionState.class);
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index d5a6471c794..fc5f7528cef 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.Lookup;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -57,24 +56,8 @@ public class LookupImpl extends BaseResource implements Lookup {
         String prefix = topicName.isV2() ? "/topic" : "/destination";
         WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName());
 
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<LookupData>() {
-                    @Override
-                    public void completed(LookupData lookupData) {
-                        if (useTls) {
-                            future.complete(lookupData.getBrokerUrlTls());
-                        } else {
-                            future.complete(lookupData.getBrokerUrl());
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<LookupData>() {})
+                .thenApply(lookupData -> useTls ? lookupData.getBrokerUrlTls() : lookupData.getBrokerUrl());
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 65417ec9e72..4920cadf608 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -269,24 +269,11 @@ public class SchemasImpl extends BaseResource implements Schemas {
     public CompletableFuture<List<SchemaInfo>> getAllSchemasAsync(String topic) {
         WebTarget path = schemasPath(TopicName.get(topic));
         TopicName topicName = TopicName.get(topic);
-        final CompletableFuture<List<SchemaInfo>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<GetAllVersionsSchemaResponse>() {
-                    @Override
-                    public void completed(GetAllVersionsSchemaResponse response) {
-                        future.complete(
-                                response.getGetSchemaResponses().stream()
-                                        .map(getSchemaResponse ->
-                                                convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
-                                        .collect(Collectors.toList()));
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<GetAllVersionsSchemaResponse>() {})
+                .thenApply(response -> response.getGetSchemaResponses().stream()
+                        .map(getSchemaResponse ->
+                                convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
+                        .collect(Collectors.toList()));
     }
 
     private WebTarget schemaPath(TopicName topicName) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index f27071eb72b..9e110b018e2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -73,23 +72,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
             return future;
         }
         WebTarget path = sink.path(tenant).path(namespace);
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(new GenericType<List<String>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<String>>() {});
     }
 
     @Override
@@ -104,23 +87,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
             return future;
         }
         WebTarget path = sink.path(tenant).path(namespace).path(sinkName);
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(SinkConfig.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SinkConfig.class);
     }
 
     @Override
@@ -136,23 +103,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
             return future;
         }
         WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path("status");
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(SinkStatus.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SinkStatus.class);
     }
 
     @Override
@@ -170,24 +121,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
             return future;
         }
         WebTarget path = sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status");
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class);
     }
 
     @Override
@@ -484,25 +418,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
     @Override
     public CompletableFuture<List<ConnectorDefinition>> getBuiltInSinksAsync() {
         WebTarget path = sink.path("builtinsinks");
-        final CompletableFuture<List<ConnectorDefinition>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    new GenericType<List<ConnectorDefinition>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<ConnectorDefinition>>() {});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 4996e8b6e94..ac1243f0b69 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -68,24 +67,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     @Override
     public CompletableFuture<List<String>> listSourcesAsync(String tenant, String namespace) {
         WebTarget path = source.path(tenant).path(namespace);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(new GenericType<List<String>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<String>>() {});
     }
 
     @Override
@@ -96,24 +78,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     @Override
     public CompletableFuture<SourceConfig> getSourceAsync(String tenant, String namespace, String sourceName) {
         WebTarget path = source.path(tenant).path(namespace).path(sourceName);
-        final CompletableFuture<SourceConfig> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(SourceConfig.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SourceConfig.class);
     }
 
     @Override
@@ -125,24 +90,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     @Override
     public CompletableFuture<SourceStatus> getSourceStatusAsync(String tenant, String namespace, String sourceName) {
         WebTarget path = source.path(tenant).path(namespace).path(sourceName).path("status");
-        final CompletableFuture<SourceStatus> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(SourceStatus.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SourceStatus.class);
     }
 
     @Override
@@ -155,26 +103,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     public CompletableFuture<SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> getSourceStatusAsync(
             String tenant, String namespace, String sourceName, int id) {
         WebTarget path = source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status");
-        final CompletableFuture<SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> future =
-                new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class);
     }
 
     @Override
@@ -432,24 +361,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     public CompletableFuture<List<ConnectorDefinition>> getBuiltInSourcesAsync() {
         WebTarget path = source.path("builtinsources");
         final CompletableFuture<List<ConnectorDefinition>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(getApiException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    new GenericType<List<ConnectorDefinition>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<ConnectorDefinition>>() {});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 87be258dbec..e3b51accdfd 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -195,40 +195,18 @@ public class TopicsImpl extends BaseResource implements Topics {
         nonPersistentPath = nonPersistentPath
                 .queryParam("bundle", options.getBundle())
                 .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
-        final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
-        final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
+        final CompletableFuture<List<String>> persistentList;
+        final CompletableFuture<List<String>> nonPersistentList;
         if (topicDomain == null || TopicDomain.persistent.equals(topicDomain)) {
-            asyncGetRequest(persistentPath,
-                    new InvocationCallback<List<String>>() {
-                        @Override
-                        public void completed(List<String> topics) {
-                            persistentList.complete(topics);
-                        }
-
-                        @Override
-                        public void failed(Throwable throwable) {
-                            persistentList.completeExceptionally(getApiException(throwable.getCause()));
-                        }
-                    });
+            persistentList = asyncGetRequest(persistentPath, new FutureCallback<List<String>>() {});
         } else {
-            persistentList.complete(Collections.emptyList());
+            persistentList = CompletableFuture.completedFuture(Collections.emptyList());
         }
 
         if (topicDomain == null || TopicDomain.non_persistent.equals(topicDomain)) {
-            asyncGetRequest(nonPersistentPath,
-                    new InvocationCallback<List<String>>() {
-                        @Override
-                        public void completed(List<String> a) {
-                            nonPersistentList.complete(a);
-                        }
-
-                        @Override
-                        public void failed(Throwable throwable) {
-                            nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
-                        }
-                    });
+            nonPersistentList = asyncGetRequest(nonPersistentPath, new FutureCallback<List<String>>() {});
         } else {
-            nonPersistentList.complete(Collections.emptyList());
+            nonPersistentList = CompletableFuture.completedFuture(Collections.emptyList());
         }
 
         return persistentList.thenCombine(nonPersistentList,
@@ -258,32 +236,10 @@ public class TopicsImpl extends BaseResource implements Topics {
         WebTarget nonPersistentPath = namespacePath("non-persistent", ns, "partitioned");
         persistentPath = persistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
         nonPersistentPath = nonPersistentPath.queryParam("includeSystemTopic", options.isIncludeSystemTopic());
-        final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
-        final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
-        asyncGetRequest(persistentPath,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> topics) {
-                        persistentList.complete(topics);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        persistentList.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        asyncGetRequest(nonPersistentPath,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> topics) {
-                        nonPersistentList.complete(topics);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
+        final CompletableFuture<List<String>> persistentList =
+                asyncGetRequest(persistentPath, new FutureCallback<List<String>>() {});
+        final CompletableFuture<List<String>> nonPersistentList =
+                asyncGetRequest(nonPersistentPath, new FutureCallback<List<String>>() {});
 
         return persistentList.thenCombine(nonPersistentList,
                 (l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
@@ -1491,25 +1447,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "lastMessageId");
-        final CompletableFuture<MessageId> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BatchMessageIdImpl>() {
-
-                    @Override
-                    public void completed(BatchMessageIdImpl response) {
-                        if (response.getBatchIndex() == -1) {
-                            future.complete(new MessageIdImpl(response.getLedgerId(),
-                                    response.getEntryId(), response.getPartitionIndex()));
-                        }
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
+        return asyncGetRequest(path, new FutureCallback<BatchMessageIdImpl>() {})
+                .thenApply(response -> {
+                    if (response.getBatchIndex() == -1) {
+                        return new MessageIdImpl(response.getLedgerId(),
+                                response.getEntryId(), response.getPartitionIndex());
                     }
+                    return response;
                 });
-        return future;
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 551ffa29b4d..2e57ee9710a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -22,13 +22,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Worker;
@@ -57,26 +54,7 @@ public class WorkerImpl extends BaseResource implements Worker {
     @Override
     public CompletableFuture<List<WorkerFunctionInstanceStats>> getFunctionsStatsAsync() {
         WebTarget path = workerStats.path("functionsmetrics");
-        final CompletableFuture<List<WorkerFunctionInstanceStats>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new ClientErrorException(response));
-                        } else {
-                            List<WorkerFunctionInstanceStats> metricsList =
-                                    response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
-                            future.complete(metricsList);
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<WorkerFunctionInstanceStats>>() {});
     }
 
     @Override
@@ -87,25 +65,8 @@ public class WorkerImpl extends BaseResource implements Worker {
     @Override
     public CompletableFuture<Collection<Metrics>> getMetricsAsync() {
         WebTarget path = workerStats.path("metrics");
-        final CompletableFuture<Collection<Metrics>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new ClientErrorException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    new GenericType<List<Metrics>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<Metrics>>() {})
+                .thenApply(list -> list);
     }
 
     @Override
@@ -116,25 +77,7 @@ public class WorkerImpl extends BaseResource implements Worker {
     @Override
     public CompletableFuture<List<WorkerInfo>> getClusterAsync() {
         WebTarget path = worker.path("cluster");
-        final CompletableFuture<List<WorkerInfo>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new ClientErrorException(response));
-                        } else {
-                            future.complete(response.readEntity(new GenericType<List<WorkerInfo>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<List<WorkerInfo>>() {});
     }
 
     @Override
@@ -145,24 +88,7 @@ public class WorkerImpl extends BaseResource implements Worker {
     @Override
     public CompletableFuture<WorkerInfo> getClusterLeaderAsync() {
         WebTarget path = worker.path("cluster").path("leader");
-        final CompletableFuture<WorkerInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new ClientErrorException(response));
-                        } else {
-                            future.complete(response.readEntity(new GenericType<WorkerInfo>(){}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<WorkerInfo>(){});
     }
 
     @Override
@@ -173,25 +99,7 @@ public class WorkerImpl extends BaseResource implements Worker {
     @Override
     public CompletableFuture<Map<String, Collection<String>>> getAssignmentsAsync() {
         WebTarget path = worker.path("assignments");
-        final CompletableFuture<Map<String, Collection<String>>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Response>() {
-                    @Override
-                    public void completed(Response response) {
-                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new ClientErrorException(response));
-                        } else {
-                            future.complete(response.readEntity(
-                                    new GenericType<Map<String, Collection<String>>>() {}));
-                        }
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new GenericType<Map<String, Collection<String>>>() {});
     }
 
     @Override