You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:36:26 UTC

[pulsar] branch branch-2.11 updated (80d2cc9f3f8 -> c6da6461cf8)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 80d2cc9f3f8 [cleanup][broker] Follow up on #16968 to restore some behavior in PersistentDispatcherMultipleConsumers class (#17018)
     new a553165c5f7 [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
     new a5e0c58bb46 [client][python] getLastMessageIdAsync C binding (#16255)
     new 643963a60fa [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
     new 433e3aff9b2 [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
     new c6da6461cf8 [refactor][java-client] Reduce code duplication in admin client (#16377)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    |  13 +-
 .../pulsar/broker/service/BrokerService.java       |   9 +-
 .../broker/service/persistent/PersistentTopic.java |   7 +-
 .../stats/prometheus/NamespaceStatsAggregator.java |   8 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |   2 +-
 .../pulsar/client/admin/internal/BaseResource.java |  31 +-
 .../pulsar/client/admin/internal/BookiesImpl.java  |  46 +-
 .../client/admin/internal/BrokerStatsImpl.java     |  91 +--
 .../pulsar/client/admin/internal/BrokersImpl.java  | 119 +--
 .../pulsar/client/admin/internal/ClustersImpl.java |  95 +--
 .../pulsar/client/admin/internal/LookupImpl.java   |  15 +-
 .../client/admin/internal/NamespacesImpl.java      | 855 ++-------------------
 .../admin/internal/NonPersistentTopicsImpl.java    |  78 +-
 .../pulsar/client/admin/internal/PackagesImpl.java |  43 +-
 .../client/admin/internal/ResourceGroupsImpl.java  |  31 +-
 .../client/admin/internal/ResourceQuotasImpl.java  |  31 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  |  48 +-
 .../pulsar/client/admin/internal/TenantsImpl.java  |  32 +-
 .../client/admin/internal/TopicPoliciesImpl.java   | 342 +--------
 .../pulsar/client/admin/internal/TopicsImpl.java   | 556 +-------------
 .../client/admin/internal/TransactionsImpl.java    | 166 +---
 pulsar-client-cpp/README.md                        |   2 +-
 pulsar-client-cpp/include/pulsar/c/consumer.h      |   3 +
 pulsar-client-cpp/lib/c/c_Consumer.cc              |   5 +
 pulsar-client-cpp/python/pulsar/__init__.py        |   7 +-
 pulsar-client-cpp/python/pulsar_test.py            |  12 +
 pulsar-client-cpp/python/src/consumer.cc           |  13 +-
 .../io/elasticsearch/ElasticSearchClientTests.java |  93 +--
 .../testclient/PerformanceTransactionTest.java     |  12 +-
 29 files changed, 336 insertions(+), 2429 deletions(-)


[pulsar] 02/05: [client][python] getLastMessageIdAsync C binding (#16255)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5e0c58bb4684c51143338f16ed67f255a6fc6e9
Author: komalatammal <10...@users.noreply.github.com>
AuthorDate: Mon Aug 8 23:19:42 2022 -0400

    [client][python] getLastMessageIdAsync C binding (#16255)
    
    * python cc binding for getLastMessageId
    
    * add python Consumer class method and doc
    
    * fix linter issues based on clang-format
    
    * ubuntu linter fix
    
    * try run unit test in ci
    
    * fix doc comment
    
    * test the test case can be ran
    
    ### Motivation
    
    Python function getLastMessageId
    
    It is a C binding for https://github.com/apache/pulsar/pull/16182 to implement get_last_message_id() in Python client.
    
    ### Modifications
    
    Add Python/C binding code for get_last_message_id()
    
    ### Verifying this change
    
    It compiles.
    - [x] Make sure that the change passes the CI checks.
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (yes)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [ ] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [ ] `doc-not-needed`
    
    
    - [x] `doc`
    Python Doc is updated in __init__.py
    
    - [ ] `doc-complete`
    (Docs have been already added)
---
 pulsar-client-cpp/README.md                   |  2 +-
 pulsar-client-cpp/include/pulsar/c/consumer.h |  3 +++
 pulsar-client-cpp/lib/c/c_Consumer.cc         |  5 +++++
 pulsar-client-cpp/python/pulsar/__init__.py   |  7 ++++++-
 pulsar-client-cpp/python/pulsar_test.py       | 12 ++++++++++++
 pulsar-client-cpp/python/src/consumer.cc      | 13 ++++++++++++-
 6 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md
index 3dfa169c923..155e6c6a907 100644
--- a/pulsar-client-cpp/README.md
+++ b/pulsar-client-cpp/README.md
@@ -281,7 +281,7 @@ ${PULSAR_PATH}/pulsar-test-service-stop.sh
 
 ## Requirements for Contributors
 
-It's recommended to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` 6.0+ to format files. 
+It's required to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` 6.0+ to format files.  `make format` automatically formats the files.
 
 Use `pulsar-client-cpp/docker-format.sh` to ensure the C++ sources are correctly formatted.
 
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index 03f80f32394..37fd2acf5b8 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -236,6 +236,9 @@ PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pu
 
 PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);
 
+PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
+                                                                pulsar_message_id_t *messageId);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index 9917e8cfad6..00d8311f132 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -143,3 +143,8 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i
 }
 
 int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); }
+
+pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
+                                                  pulsar_message_id_t *messageId) {
+    return (pulsar_result)consumer->consumer.getLastMessageId(messageId->messageId);
+}
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index e79955b57dd..3832c3e69e2 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -1253,7 +1253,12 @@ class Consumer:
         Check if the consumer is connected or not.
         """
         return self._consumer.is_connected()
-
+    
+    def get_last_message_id(self):
+        """
+        Get the last message id.
+        """
+        return self._consumer.get_last_message_id()
 
 
 class Reader:
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index dbdd6be59c7..127ecc4247c 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -753,6 +753,18 @@ class PulsarTest(TestCase):
         self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
         client.close()
 
+    def test_get_last_message_id(self):
+        client = Client(self.serviceUrl)
+        consumer = client.subscribe(
+            "persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
+        )
+        producer = client.create_producer("persistent://public/default/topic_name_test")
+        msg_id = producer.send(b"hello")
+
+        msg = consumer.receive(TM)
+        self.assertEqual(msg.message_id(), msg_id)
+        client.close()
+
     def test_publish_compact_and_consume(self):
         client = Client(self.serviceUrl)
         topic = "compaction_%s" % (uuid.uuid4())
diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc
index 10ffd07496f..811ceb3ddf5 100644
--- a/pulsar-client-cpp/python/src/consumer.cc
+++ b/pulsar-client-cpp/python/src/consumer.cc
@@ -83,6 +83,16 @@ void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
 
 bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); }
 
+MessageId Consumer_get_last_message_id(Consumer& consumer) {
+    MessageId msgId;
+    Result res;
+    Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
+    Py_END_ALLOW_THREADS
+
+        CHECK_RESULT(res);
+    return msgId;
+}
+
 void export_consumer() {
     using namespace boost::python;
 
@@ -105,5 +115,6 @@ void export_consumer() {
         .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)
         .def("seek", &Consumer_seek)
         .def("seek", &Consumer_seek_timestamp)
-        .def("is_connected", &Consumer_is_connected);
+        .def("is_connected", &Consumer_is_connected)
+        .def("get_last_message_id", &Consumer_get_last_message_id);
 }


[pulsar] 01/05: [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a553165c5f7af24172fad4422419b50c7fd73545
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Tue Aug 9 14:07:31 2022 +0800

    [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
---
 .../io/elasticsearch/ElasticSearchClientTests.java | 93 ++++++++++++----------
 1 file changed, 50 insertions(+), 43 deletions(-)

diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 7ade25046f1..8a453dfe451 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,7 +18,20 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -34,20 +47,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Optional;
-import java.util.UUID;
-
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
     public final static String INDEX = "myindex";
@@ -78,8 +77,16 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
     }
 
     static class MockRecord<T> implements Record<T> {
-        int acked = 0;
-        int failed = 0;
+        LongAdder acked = new LongAdder();
+        LongAdder failed = new LongAdder();
+
+        public int getAcked() {
+            return acked.intValue();
+        }
+
+        public int getFailed() {
+            return failed.intValue();
+        }
 
         @Override
         public T getValue() {
@@ -88,12 +95,12 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
 
         @Override
         public void ack() {
-            acked++;
+            acked.increment();
         }
 
         @Override
         public void fail() {
-            failed++;
+            failed.increment();
         }
     }
 
@@ -152,13 +159,13 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
             try {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
                 client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
-                assertEquals(mockRecord.acked, 1);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 1);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.getRestClient().totalHits(index), 1);
 
                 client.deleteDocument(mockRecord, "1");
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.getRestClient().totalHits(index), 0);
             } finally {
                 client.getRestClient().deleteIndex(index);
@@ -216,11 +223,11 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
             client.flush();
             assertNotNull(client.irrecoverableError.get());
             assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
             assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 2);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 2);
         }
     }
 
@@ -239,8 +246,8 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
             assertNull(client.irrecoverableError.get());
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
         }
     }
 
@@ -266,8 +273,8 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
                     MockRecord<GenericObject> mockRecord = new MockRecord<>();
                     client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
                     client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 2);
 
                     log.info("starting the toxic");
@@ -276,13 +283,13 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
                     toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
 
                     client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 2);
 
                     client.flush();
-                    assertEquals(mockRecord.acked, 3);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 3);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 3);
                 } finally {
                     client.getRestClient().deleteIndex(index);
@@ -316,14 +323,14 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
                     }
 
                     Awaitility.await().untilAsserted(() -> {
-                        assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
-                        assertEquals(mockRecord.failed, 0);
+                        assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4));
+                        assertEquals(mockRecord.getFailed(), 0);
                         assertThat("totalHits", client.getRestClient().totalHits(index), greaterThanOrEqualTo(4L));
                     });
                     client.flush();
                     Awaitility.await().untilAsserted(() -> {
-                        assertEquals(mockRecord.failed, 0);
-                        assertEquals(mockRecord.acked, 5);
+                        assertEquals(mockRecord.getFailed(), 0);
+                        assertEquals(mockRecord.getAcked(), 5);
                         assertEquals(client.getRestClient().totalHits(index), 5);
                     });
 
@@ -344,8 +351,8 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
                     assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy
 
                     Awaitility.await().untilAsserted(() -> {
-                        assertEquals(mockRecord.acked, 15);
-                        assertEquals(mockRecord.failed, 0);
+                        assertEquals(mockRecord.getAcked(), 15);
+                        assertEquals(mockRecord.getFailed(), 0);
                     });
 
                 } finally {
@@ -372,13 +379,13 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
                 client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i + "}"));
                 client.bulkDelete(mockRecord, "key" + i);
             }
-            assertEquals(mockRecord.acked, 10);
-            assertEquals(mockRecord.failed, 0);
+            assertEquals(mockRecord.getAcked(), 10);
+            assertEquals(mockRecord.getFailed(), 0);
             assertEquals(client.getRestClient().totalHits(index), 0);
             // no effect
             client.flush();
 
-            assertEquals(mockRecord.acked, 10);
+            assertEquals(mockRecord.getAcked(), 10);
         }
     }
 


[pulsar] 05/05: [refactor][java-client] Reduce code duplication in admin client (#16377)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6da6461cf81ef3f45c2dd09e4690a6d47fa7d4e
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Mon Aug 8 14:43:26 2022 +0200

    [refactor][java-client] Reduce code duplication in admin client (#16377)
---
 .../pulsar/client/admin/internal/BaseResource.java |  31 +-
 .../pulsar/client/admin/internal/BookiesImpl.java  |  46 +-
 .../client/admin/internal/BrokerStatsImpl.java     |  91 +--
 .../pulsar/client/admin/internal/BrokersImpl.java  | 119 +--
 .../pulsar/client/admin/internal/ClustersImpl.java |  95 +--
 .../pulsar/client/admin/internal/LookupImpl.java   |  15 +-
 .../client/admin/internal/NamespacesImpl.java      | 855 ++-------------------
 .../admin/internal/NonPersistentTopicsImpl.java    |  78 +-
 .../pulsar/client/admin/internal/PackagesImpl.java |  43 +-
 .../client/admin/internal/ResourceGroupsImpl.java  |  31 +-
 .../client/admin/internal/ResourceQuotasImpl.java  |  31 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  |  48 +-
 .../pulsar/client/admin/internal/TenantsImpl.java  |  32 +-
 .../client/admin/internal/TopicPoliciesImpl.java   | 342 +--------
 .../pulsar/client/admin/internal/TopicsImpl.java   | 556 +-------------
 .../client/admin/internal/TransactionsImpl.java    | 166 +---
 16 files changed, 223 insertions(+), 2356 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index d0c55c01957..8316aa8b4e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -181,6 +181,11 @@ public abstract class BaseResource {
         }
     }
 
+    public <T> CompletableFuture<T> asyncGetRequest(final WebTarget target, FutureCallback<T> callback) {
+        asyncGetRequest(target, (InvocationCallback<T>) callback);
+        return callback.future();
+    }
+
     public CompletableFuture<Void> asyncDeleteRequest(final WebTarget target) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         try {
@@ -211,7 +216,7 @@ public abstract class BaseResource {
         }
     }
 
-    public PulsarAdminException getApiException(Throwable e) {
+    public static PulsarAdminException getApiException(Throwable e) {
         if (e instanceof PulsarAdminException) {
             return (PulsarAdminException) e;
         } else if (e instanceof ServiceUnavailableException) {
@@ -313,4 +318,28 @@ public abstract class BaseResource {
             throw PulsarAdminException.wrap(getApiException(e));
         }
     }
+
+    /**
+     * InvocationCallback that creates a CompletableFuture and completes it based on the response.
+     * Must be subclassed to provide runtime type information to the ReST client library.
+     * @param <T> type to which the response body is parsed in case of success
+     */
+    abstract static class FutureCallback<T> implements InvocationCallback<T> {
+        private final CompletableFuture<T> future = new CompletableFuture<>();
+
+        @Override
+        public void completed(T value) {
+            future.complete(value);
+        }
+
+        @Override
+        public void failed(Throwable throwable) {
+            future.completeExceptionally(getApiException(throwable.getCause()));
+        }
+
+        public CompletableFuture<T> future() {
+            return future;
+        }
+
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
index d2b56217b34..f5f9a248b12 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.admin.internal;
 
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.Bookies;
@@ -46,20 +45,7 @@ public class BookiesImpl extends BaseResource implements Bookies {
     @Override
     public CompletableFuture<BookiesClusterInfo> getBookiesAsync() {
         WebTarget path = adminBookies.path("all");
-        final CompletableFuture<BookiesClusterInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BookiesClusterInfo>() {
-                    @Override
-                    public void completed(BookiesClusterInfo bookies) {
-                        future.complete(bookies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<BookiesClusterInfo>(){});
     }
 
     @Override
@@ -70,20 +56,7 @@ public class BookiesImpl extends BaseResource implements Bookies {
     @Override
     public CompletableFuture<BookiesRackConfiguration> getBookiesRackInfoAsync() {
         WebTarget path = adminBookies.path("racks-info");
-        final CompletableFuture<BookiesRackConfiguration> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BookiesRackConfiguration>() {
-                    @Override
-                    public void completed(BookiesRackConfiguration bookiesRackConfiguration) {
-                        future.complete(bookiesRackConfiguration);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<BookiesRackConfiguration>(){});
     }
 
     @Override
@@ -94,20 +67,7 @@ public class BookiesImpl extends BaseResource implements Bookies {
     @Override
     public CompletableFuture<BookieInfo> getBookieRackInfoAsync(String bookieAddress) {
         WebTarget path = adminBookies.path("racks-info").path(bookieAddress);
-        final CompletableFuture<BookieInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BookieInfo>() {
-                    @Override
-                    public void completed(BookieInfo bookieInfo) {
-                        future.complete(bookieInfo);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<BookieInfo>(){});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
index 5afabde820d..86d9af6a8fb 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.admin.internal;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import java.util.concurrent.CompletableFuture;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -53,20 +52,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<String> getMetricsAsync() {
         WebTarget path = adminV2BrokerStats.path("/metrics");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
     @Override
@@ -77,20 +63,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<AllocatorStats> getAllocatorStatsAsync(String allocatorName) {
         WebTarget path = adminV2BrokerStats.path("/allocator-stats").path(allocatorName);
-        final CompletableFuture<AllocatorStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<AllocatorStats>() {
-                    @Override
-                    public void completed(AllocatorStats allocatorStats) {
-                        future.complete(allocatorStats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<AllocatorStats>(){});
     }
 
     @Override
@@ -101,20 +74,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<String> getMBeansAsync() {
         WebTarget path = adminV2BrokerStats.path("/mbeans");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
     @Override
@@ -125,20 +85,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<String> getTopicsAsync() {
         WebTarget path = adminV2BrokerStats.path("/topics");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
     @Override
@@ -149,20 +96,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<LoadManagerReport> getLoadReportAsync() {
         WebTarget path = adminV2BrokerStats.path("/load-report");
-        final CompletableFuture<LoadManagerReport> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<LoadManagerReport>() {
-                    @Override
-                    public void completed(LoadManagerReport loadManagerReport) {
-                        future.complete(loadManagerReport);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<LoadManagerReport>(){});
     }
 
     @Override
@@ -173,20 +107,7 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     @Override
     public CompletableFuture<String> getPendingBookieOpsStatsAsync() {
         WebTarget path = adminV2BrokerStats.path("/bookieops");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
     public JsonObject getBrokerResourceAvailability(String namespace) throws PulsarAdminException {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 3257ad2067e..81de4ed6e05 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -60,20 +60,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<List<String>> getActiveBrokersAsync(String cluster) {
         WebTarget path = cluster == null ? adminBrokers : adminBrokers.path(cluster);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> brokers) {
-                        future.complete(brokers);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -84,20 +71,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<BrokerInfo> getLeaderBrokerAsync() {
         WebTarget path = adminBrokers.path("leaderBroker");
-        final CompletableFuture<BrokerInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BrokerInfo>() {
-                    @Override
-                    public void completed(BrokerInfo leaderBroker) {
-                        future.complete(leaderBroker);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<BrokerInfo>(){});
     }
 
     @Override
@@ -110,20 +84,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNamespacesAsync(
             String cluster, String brokerUrl) {
         WebTarget path = adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces");
-        final CompletableFuture<Map<String, NamespaceOwnershipStatus>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, NamespaceOwnershipStatus>>() {
-                    @Override
-                    public void completed(Map<String, NamespaceOwnershipStatus> ownedNamespaces) {
-                        future.complete(ownedNamespaces);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, NamespaceOwnershipStatus>>(){});
     }
 
     @Override
@@ -157,20 +118,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<Map<String, String>> getAllDynamicConfigurationsAsync() {
         WebTarget path = adminBrokers.path("configuration").path("values");
-        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, String>>() {
-                    @Override
-                    public void completed(Map<String, String> allConfs) {
-                        future.complete(allConfs);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
     }
 
     @Override
@@ -181,20 +129,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<List<String>> getDynamicConfigurationNamesAsync() {
         WebTarget path = adminBrokers.path("configuration");
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> confNames) {
-                        future.complete(confNames);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -205,20 +140,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<Map<String, String>> getRuntimeConfigurationsAsync() {
         WebTarget path = adminBrokers.path("configuration").path("runtime");
-        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, String>>() {
-                    @Override
-                    public void completed(Map<String, String> runtimeConfs) {
-                        future.complete(runtimeConfs);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
     }
 
     @Override
@@ -229,20 +151,7 @@ public class BrokersImpl extends BaseResource implements Brokers {
     @Override
     public CompletableFuture<InternalConfigurationData> getInternalConfigurationDataAsync() {
         WebTarget path = adminBrokers.path("internal-configuration");
-        final CompletableFuture<InternalConfigurationData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<InternalConfigurationData>() {
-                    @Override
-                    public void completed(InternalConfigurationData internalConfigurationData) {
-                        future.complete(internalConfigurationData);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<InternalConfigurationData>(){});
     }
 
     @Override
@@ -329,18 +238,6 @@ public class BrokersImpl extends BaseResource implements Brokers {
     public CompletableFuture<String> getVersionAsync() {
         WebTarget path = adminBrokers.path("version");
 
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<String>() {
-            @Override
-            public void completed(String version) {
-                future.complete(version);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 03b02759206..96744ff490a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -57,20 +57,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
 
     @Override
     public CompletableFuture<List<String>> getClustersAsync() {
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(adminClusters,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> clusters) {
-                        future.complete(clusters);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        WebTarget path = this.adminClusters;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -81,20 +69,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     @Override
     public CompletableFuture<ClusterData> getClusterAsync(String cluster) {
         WebTarget path = adminClusters.path(cluster);
-        final CompletableFuture<ClusterData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<ClusterDataImpl>() {
-                    @Override
-                    public void completed(ClusterDataImpl clusterData) {
-                        future.complete(clusterData);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<ClusterDataImpl>(){})
+                .thenApply(clusterData -> clusterData);
     }
 
     @Override
@@ -140,20 +116,7 @@ public class ClustersImpl extends BaseResource implements Clusters {
     @Override
     public CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster) {
         WebTarget path = adminClusters.path(cluster).path("peers");
-        final CompletableFuture<Set<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Set<String>>() {
-                    @Override
-                    public void completed(Set<String> clusterNames) {
-                        future.complete(clusterNames);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Set<String>>(){});
     }
 
     @Override
@@ -233,20 +196,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(
             String cluster, String broker) {
         WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker);
-        final CompletableFuture<BrokerNamespaceIsolationData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BrokerNamespaceIsolationDataImpl>() {
-                    @Override
-                    public void completed(BrokerNamespaceIsolationDataImpl brokerNamespaceIsolationData) {
-                        future.complete(brokerNamespaceIsolationData);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<BrokerNamespaceIsolationDataImpl>(){})
+                .thenApply(brokerNamespaceIsolationData -> brokerNamespaceIsolationData);
     }
 
     @Override
@@ -305,20 +256,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     public CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(
             String cluster, String policyName) {
         WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
-        final CompletableFuture<NamespaceIsolationData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<NamespaceIsolationDataImpl>() {
-                    @Override
-                    public void completed(NamespaceIsolationDataImpl namespaceIsolationData) {
-                        future.complete(namespaceIsolationData);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<NamespaceIsolationDataImpl>(){})
+                .thenApply(namespaceIsolationData -> namespaceIsolationData);
     }
 
     @Override
@@ -389,20 +328,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
     @Override
     public CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName) {
         WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
-        final CompletableFuture<FailureDomain> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<FailureDomainImpl>() {
-                    @Override
-                    public void completed(FailureDomainImpl failureDomain) {
-                        future.complete(failureDomain);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<FailureDomainImpl>(){})
+                .thenApply(failureDomain -> failureDomain);
     }
 
     private void setDomain(String cluster, String domainName,
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index 13923ccf213..d5a6471c794 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -134,20 +134,7 @@ public class LookupImpl extends BaseResource implements Lookup {
         TopicName topicName = TopicName.get(topic);
         String prefix = topicName.isV2() ? "/topic" : "/destination";
         WebTarget path = v2lookup.path(prefix).path(topicName.getLookupName()).path("bundle");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String bundleRange) {
-                        future.complete(bundleRange);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index d9672b9e196..bfb060596b1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -78,20 +78,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     @Override
     public CompletableFuture<List<String>> getNamespacesAsync(String tenant) {
         WebTarget path = adminV2Namespaces.path(tenant);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> namespaces) {
-                        future.complete(namespaces);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -101,21 +88,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     public CompletableFuture<List<String>> getNamespacesAsync(String tenant, String cluster) {
         WebTarget path = adminNamespaces.path(tenant).path(cluster);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-
-                    @Override
-                    public void completed(List<String> namespaces) {
-                        future.complete(namespaces);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -130,44 +103,13 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public  CompletableFuture<BundlesData> getBundlesAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        String action = "bundles";
-        WebTarget path = namespacePath(ns, action);
-        final CompletableFuture<BundlesData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BundlesData>() {
-                    @Override
-                    public void completed(BundlesData bundles) {
-                        future.complete(bundles);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<BundlesData>(){}, namespace, "bundles");
     }
 
     @Override
     public CompletableFuture<List<String>> getTopicsAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        String action = ns.isV2() ? "topics" : "destinations";
-        WebTarget path = namespacePath(ns, action);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> topics) {
-                        future.complete(topics);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<List<String>>(){}, namespace,
+                NamespaceName.get(namespace).isV2() ? "topics" : "destinations");
     }
 
     @Override
@@ -184,20 +126,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         path = path
                 .queryParam("mode", options.getMode())
                 .queryParam("includeSystemTopic", options.isIncludeSystemTopic());
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> topics) {
-                        future.complete(topics);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -207,22 +136,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Policies> getPoliciesAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns);
-        final CompletableFuture<Policies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Policies>() {
-                    @Override
-                    public void completed(Policies policies) {
-                        future.complete(policies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Policies>(){}, namespace);
     }
 
     @Override
@@ -359,22 +273,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "permissions");
-        final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, Set<AuthAction>>>() {
-                    @Override
-                    public void completed(Map<String, Set<AuthAction>> permissions) {
-                        future.complete(permissions);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Map<String, Set<AuthAction>>>(){}, namespace, "permissions");
     }
 
     @Override
@@ -410,22 +309,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Map<String, Set<String>>> getPermissionOnSubscriptionAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "permissions", "subscription");
-        final CompletableFuture<Map<String, Set<String>>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, Set<String>>>() {
-                    @Override
-                    public void completed(Map<String, Set<String>> permissions) {
-                        future.complete(permissions);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<Map<String, Set<String>>>(){}, namespace, "permissions", "subscription");
     }
 
     @Override
@@ -463,22 +348,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<List<String>> getNamespaceReplicationClustersAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "replication");
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> clusters) {
-                        future.complete(clusters);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<List<String>>(){}, namespace, "replication");
     }
 
     @Override
@@ -500,22 +370,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getNamespaceMessageTTLAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "messageTTL");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer ttl) {
-                        future.complete(ttl);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "messageTTL");
     }
 
     @Override
@@ -549,21 +404,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getSubscriptionExpirationTimeAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "subscriptionExpirationTime");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Integer>() {
-            @Override
-            public void completed(Integer expirationTime) {
-                future.complete(expirationTime);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "subscriptionExpirationTime");
     }
 
     @Override
@@ -612,22 +453,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<String> getNamespaceAntiAffinityGroupAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "antiAffinity");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<String>(){}, namespace, "antiAffinity");
     }
 
     @Override
@@ -641,20 +467,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
             String tenant, String cluster, String namespaceAntiAffinityGroup) {
         WebTarget path = adminNamespaces.path(cluster)
                 .path("antiAffinity").path(namespaceAntiAffinityGroup).queryParam("property", tenant);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> antiNamespaces) {
-                        future.complete(antiNamespaces);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -688,22 +501,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Boolean> getDeduplicationStatusAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "deduplication");
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Boolean>() {
-                    @Override
-                    public void completed(Boolean enabled) {
-                        future.complete(enabled);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Boolean>(){}, namespace, "deduplication");
     }
 
     @Override
@@ -739,22 +537,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<AutoTopicCreationOverride> getAutoTopicCreationAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "autoTopicCreation");
-        final CompletableFuture<AutoTopicCreationOverride> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<AutoTopicCreationOverride>() {
-                    @Override
-                    public void completed(AutoTopicCreationOverride autoTopicCreationOverride) {
-                        future.complete(autoTopicCreationOverride);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<AutoTopicCreationOverride>(){}, namespace,
+                "autoTopicCreation");
     }
 
     @Override
@@ -790,22 +574,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<AutoSubscriptionCreationOverride> getAutoSubscriptionCreationAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "autoSubscriptionCreation");
-        final CompletableFuture<AutoSubscriptionCreationOverride> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<AutoSubscriptionCreationOverride>() {
-                    @Override
-                    public void completed(AutoSubscriptionCreationOverride autoSubscriptionCreation) {
-                        future.complete(autoSubscriptionCreation);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<AutoSubscriptionCreationOverride>(){}, namespace, "autoSubscriptionCreation");
     }
 
 
@@ -830,22 +600,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "subscriptionTypesEnabled");
-        final CompletableFuture<Set<SubscriptionType>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Set<SubscriptionType>>() {
-                    @Override
-                    public void completed(Set<SubscriptionType> subscriptionTypesEnabled) {
-                        future.complete(subscriptionTypesEnabled);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<Set<SubscriptionType>>(){}, namespace, "subscriptionTypesEnabled");
     }
 
     @Override
@@ -879,22 +635,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> getBacklogQuotaMapAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "backlogQuotaMap");
-        final CompletableFuture<Map<BacklogQuotaType, BacklogQuota>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<BacklogQuotaType, BacklogQuota>>() {
-                    @Override
-                    public void completed(Map<BacklogQuotaType, BacklogQuota> quotaMap) {
-                        future.complete(quotaMap);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<Map<BacklogQuotaType, BacklogQuota>>(){}, namespace, "backlogQuotaMap");
     }
 
     @Override
@@ -996,22 +738,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<BookieAffinityGroupData> getBookieAffinityGroupAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
-        final CompletableFuture<BookieAffinityGroupData> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<BookieAffinityGroupData>() {
-                    @Override
-                    public void completed(BookieAffinityGroupData bookieAffinityGroupData) {
-                        future.complete(bookieAffinityGroupData);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<BookieAffinityGroupData>(){}, namespace, "persistence", "bookieAffinity");
     }
 
     @Override
@@ -1021,22 +749,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<PersistencePolicies> getPersistenceAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "persistence");
-        final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PersistencePolicies>() {
-                    @Override
-                    public void completed(PersistencePolicies persistencePolicies) {
-                        future.complete(persistencePolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<PersistencePolicies>(){}, namespace, "persistence");
     }
 
     @Override
@@ -1070,22 +783,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<RetentionPolicies> getRetentionAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "retention");
-        final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<RetentionPolicies>() {
-                    @Override
-                    public void completed(RetentionPolicies retentionPolicies) {
-                        future.complete(retentionPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<RetentionPolicies>(){}, namespace, "retention");
     }
 
     @Override
@@ -1107,22 +805,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<String> getReplicationConfigVersionAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "configversion");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String s) {
-                        future.complete(s);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<String>(){}, namespace, "configversion");
     }
 
     @Override
@@ -1185,20 +868,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         if (topics != null && topics.size() > 0) {
             path = path.queryParam("topics", topics.stream().map(Codec::encode).toArray());
         }
-        final CompletableFuture<TopicHashPositions> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TopicHashPositions>() {
-                    @Override
-                    public void completed(TopicHashPositions topicHashPositions) {
-                        future.complete(topicHashPositions);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TopicHashPositions>(){});
     }
 
     @Override
@@ -1232,22 +902,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<PublishRate> getPublishRateAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "publishRate");
-        final CompletableFuture<PublishRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PublishRate>() {
-                    @Override
-                    public void completed(PublishRate publishRate) {
-                        future.complete(publishRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<PublishRate>(){}, namespace, "publishRate");
     }
 
     @Override
@@ -1281,22 +936,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<DispatchRate> getDispatchRateAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "dispatchRate");
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<DispatchRate>(){}, namespace, "dispatchRate");
     }
 
     @Override
@@ -1330,22 +970,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "subscribeRate");
-        final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SubscribeRate>() {
-                    @Override
-                    public void completed(SubscribeRate subscribeRate) {
-                        future.complete(subscribeRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<SubscribeRate>(){}, namespace, "subscribeRate");
     }
 
     @Override
@@ -1380,22 +1005,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<DispatchRate>(){}, namespace, "subscriptionDispatchRate");
     }
 
     @Override
@@ -1429,22 +1039,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "replicatorDispatchRate");
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<DispatchRate>(){}, namespace, "replicatorDispatchRate");
     }
 
     @Override
@@ -1545,22 +1140,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<SubscriptionAuthMode> getSubscriptionAuthModeAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "subscriptionAuthMode");
-        final CompletableFuture<SubscriptionAuthMode> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SubscriptionAuthMode>() {
-                    @Override
-                    public void completed(SubscriptionAuthMode subscriptionAuthMode) {
-                        future.complete(subscriptionAuthMode);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<SubscriptionAuthMode>(){}, namespace, "subscriptionAuthMode");
     }
 
     @Override
@@ -1582,22 +1162,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Boolean> getEncryptionRequiredStatusAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "encryptionRequired");
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Boolean>() {
-                    @Override
-                    public void completed(Boolean enabled) {
-                        future.complete(enabled);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Boolean>(){}, namespace, "encryptionRequired");
     }
 
     @Override
@@ -1607,22 +1172,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "delayedDelivery");
-        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DelayedDeliveryPolicies>() {
-                    @Override
-                    public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
-                        future.complete(delayedDeliveryPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<DelayedDeliveryPolicies>(){}, namespace, "delayedDelivery");
     }
 
     @Override
@@ -1658,21 +1208,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
-        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
-                    @Override
-                    public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
-                        future.complete(inactiveTopicPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<InactiveTopicPolicies>(){}, namespace,
+                "inactiveTopicPolicies");
     }
 
     @Override
@@ -1696,22 +1233,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getDeduplicationSnapshotIntervalAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "deduplicationSnapshotInterval");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer interval) {
-                        future.complete(interval);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "deduplicationSnapshotInterval");
     }
 
     @Override
@@ -1743,22 +1265,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxSubscriptionsPerTopic");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxSubscriptionsPerTopic) {
-                        future.complete(maxSubscriptionsPerTopic);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxSubscriptionsPerTopic");
     }
 
     @Override
@@ -1793,22 +1300,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxProducersPerTopicAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxProducersPerTopic");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer max) {
-                        future.complete(max);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxProducersPerTopic");
     }
 
     @Override
@@ -1842,22 +1334,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxConsumersPerTopicAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxConsumersPerTopic");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer max) {
-                        future.complete(max);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxConsumersPerTopic");
     }
 
     @Override
@@ -1891,22 +1368,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer max) {
-                        future.complete(max);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxConsumersPerSubscription");
     }
 
     @Override
@@ -1944,22 +1406,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxUnackedMessagesPerConsumerAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerConsumer");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer max) {
-                        future.complete(max);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxUnackedMessagesPerConsumer");
     }
 
     @Override
@@ -1995,22 +1442,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxUnackedMessagesPerSubscriptionAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxUnackedMessagesPerSubscription");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer max) {
-                        future.complete(max);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxUnackedMessagesPerSubscription");
     }
 
     @Override
@@ -2048,22 +1480,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Long> getCompactionThresholdAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "compactionThreshold");
-        final CompletableFuture<Long> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Long>() {
-                    @Override
-                    public void completed(Long threshold) {
-                        future.complete(threshold);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Long>(){}, namespace, "compactionThreshold");
     }
 
     @Override
@@ -2097,22 +1514,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Long> getOffloadThresholdAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "offloadThreshold");
-        final CompletableFuture<Long> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Long>() {
-                    @Override
-                    public void completed(Long threshold) {
-                        future.complete(threshold);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Long>(){}, namespace, "offloadThreshold");
     }
 
     @Override
@@ -2134,22 +1536,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Long> getOffloadDeleteLagMsAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
-        final CompletableFuture<Long> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Long>() {
-                    @Override
-                    public void completed(Long lag) {
-                        future.complete(lag);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Long>(){}, namespace, "offloadDeletionLagMs");
     }
 
     @Override
@@ -2224,20 +1611,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "schemaValidationEnforced");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Boolean>() {
-                    @Override
-                    public void completed(Boolean enforced) {
-                        future.complete(enforced);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Boolean>() {});
     }
 
     @Override
@@ -2261,22 +1635,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "schemaCompatibilityStrategy");
-        final CompletableFuture<SchemaCompatibilityStrategy> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SchemaCompatibilityStrategy>() {
-                    @Override
-                    public void completed(SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
-                        future.complete(schemaCompatibilityStrategy);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(
+                new FutureCallback<SchemaCompatibilityStrategy>(){}, namespace, "schemaCompatibilityStrategy");
     }
 
     @Override
@@ -2300,22 +1660,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Boolean> getIsAllowAutoUpdateSchemaAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "isAllowAutoUpdateSchema");
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Boolean>() {
-                    @Override
-                    public void completed(Boolean allowAutoUpdate) {
-                        future.complete(allowAutoUpdate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Boolean>(){}, namespace, "isAllowAutoUpdateSchema");
     }
 
     @Override
@@ -2363,22 +1708,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "offloadPolicies");
-        final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<OffloadPoliciesImpl>() {
-                    @Override
-                    public void completed(OffloadPoliciesImpl offloadPolicies) {
-                        future.complete(offloadPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<OffloadPoliciesImpl>(){}, namespace, "offloadPolicies")
+                .thenApply(policies -> policies);
     }
 
     @Override
@@ -2388,22 +1719,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Integer> getMaxTopicsPerNamespaceAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "maxTopicsPerNamespace");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxTopicsPerNamespace) {
-                        future.complete(maxTopicsPerNamespace);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Integer>(){}, namespace, "maxTopicsPerNamespace");
     }
 
     @Override
@@ -2461,21 +1777,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<String> getPropertyAsync(String namespace, String key) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        WebTarget path = namespacePath(ns, "property", key);
-        asyncGetRequest(path, new InvocationCallback<String>() {
-            @Override
-            public void completed(String value) {
-                future.complete(StringUtils.isEmpty(value) ? null : value);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<String>(){}, namespace, "property", key)
+                .thenApply(value -> StringUtils.isEmpty(value) ? null : value);
     }
 
     @Override
@@ -2485,41 +1788,12 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
 
     @Override
     public CompletableFuture<Map<String, String>> getPropertiesAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
-        WebTarget path = namespacePath(ns, "properties");
-        asyncGetRequest(path, new InvocationCallback<Map<String, String>>() {
-            @Override
-            public void completed(Map<String, String> value) {
-                future.complete(value);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<Map<String, String>>(){}, namespace, "properties");
     }
 
     @Override
     public CompletableFuture<String> getNamespaceResourceGroupAsync(String namespace) {
-        NamespaceName ns = NamespaceName.get(namespace);
-        WebTarget path = namespacePath(ns, "resourcegroup");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String rgName) {
-                        future.complete(rgName);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetNamespaceParts(new FutureCallback<String>(){}, namespace, "resourcegroup");
     }
 
     @Override
@@ -2594,4 +1868,11 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         namespacePath = WebTargets.addParts(namespacePath, parts);
         return namespacePath;
     }
+
+    private <T> CompletableFuture<T> asyncGetNamespaceParts(FutureCallback<T> callback,
+                                                            String namespace, String... parts) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, parts);
+        return asyncGetRequest(path, callback);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 7f70e7757ae..635df525619 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.NonPersistentTopics;
@@ -66,22 +65,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     @Override
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
         TopicName topicName = validateTopic(topic);
-        final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
         WebTarget path = topicPath(topicName, "partitions");
-        asyncGetRequest(path,
-                new InvocationCallback<PartitionedTopicMetadata>() {
-
-                    @Override
-                    public void completed(PartitionedTopicMetadata response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PartitionedTopicMetadata>(){});
     }
 
     @Override
@@ -92,22 +77,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     @Override
     public CompletableFuture<NonPersistentTopicStats> getStatsAsync(String topic) {
         TopicName topicName = validateTopic(topic);
-        final CompletableFuture<NonPersistentTopicStats> future = new CompletableFuture<>();
         WebTarget path = topicPath(topicName, "stats");
-        asyncGetRequest(path,
-                new InvocationCallback<NonPersistentTopicStats>() {
-
-                    @Override
-                    public void completed(NonPersistentTopicStats response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<NonPersistentTopicStats>() {});
     }
 
     @Override
@@ -118,22 +89,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     @Override
     public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic) {
         TopicName topicName = validateTopic(topic);
-        final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<>();
         WebTarget path = topicPath(topicName, "internalStats");
-        asyncGetRequest(path,
-                new InvocationCallback<PersistentTopicInternalStats>() {
-
-                    @Override
-                    public void completed(PersistentTopicInternalStats response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PersistentTopicInternalStats>(){});
     }
 
     @Override
@@ -156,20 +113,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     @Override
     public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
         NamespaceName ns = NamespaceName.get(namespace);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
         WebTarget path = namespacePath("non-persistent", ns, bundleRange);
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> response) {
-                        future.complete(response);
-                    }
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>() {});
     }
 
     @Override
@@ -180,21 +125,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     @Override
     public CompletableFuture<List<String>> getListAsync(String namespace) {
         NamespaceName ns = NamespaceName.get(namespace);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
         WebTarget path = namespacePath("non-persistent", ns);
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     /*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index d32bd308e4f..1ed3e5da367 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -67,19 +67,8 @@ public class PackagesImpl extends ComponentResource implements Packages {
     @Override
     public CompletableFuture<PackageMetadata> getMetadataAsync(String packageName) {
         WebTarget path = packages.path(PackageName.get(packageName).toRestPath() + "/metadata");
-        final CompletableFuture<PackageMetadata> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<PackageMetadata>() {
-            @Override
-            public void completed(PackageMetadata metadata) {
-                future.complete(metadata);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;    }
+        return asyncGetRequest(path, new FutureCallback<PackageMetadata>(){});
+    }
 
     @Override
     public void updateMetadata(String packageName, PackageMetadata metadata) throws PulsarAdminException {
@@ -186,19 +175,7 @@ public class PackagesImpl extends ComponentResource implements Packages {
         PackageName name = PackageName.get(packageName);
         WebTarget path = packages.path(String.format("%s/%s/%s/%s",
             name.getPkgType().toString(), name.getTenant(), name.getNamespace(), name.getName()));
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<List<String>>() {
-            @Override
-            public void completed(List<String> strings) {
-                future.complete(strings);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -209,18 +186,6 @@ public class PackagesImpl extends ComponentResource implements Packages {
     @Override
     public CompletableFuture<List<String>> listPackagesAsync(String type, String namespace) {
         WebTarget path = packages.path(type + "/" + NamespaceName.get(namespace).toString());
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<List<String>>() {
-            @Override
-            public void completed(List<String> strings) {
-                future.complete(strings);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
index 40551387a2c..99d60bcf259 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -45,20 +44,7 @@ public class ResourceGroupsImpl extends BaseResource implements ResourceGroups {
 
     @Override
     public CompletableFuture<List<String>> getResourceGroupsAsync() {
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(adminResourceGroups,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> resourcegroups) {
-                        future.complete(resourcegroups);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(this.adminResourceGroups, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -69,20 +55,7 @@ public class ResourceGroupsImpl extends BaseResource implements ResourceGroups {
     @Override
     public CompletableFuture<ResourceGroup> getResourceGroupAsync(String name) {
         WebTarget path = adminResourceGroups.path(name);
-        final CompletableFuture<ResourceGroup> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<ResourceGroup>() {
-                    @Override
-                    public void completed(ResourceGroup resourcegroup) {
-                        future.complete(resourcegroup);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<ResourceGroup>(){});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index e5d6e49d612..7ab917a8532 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.admin.internal;
 
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -47,20 +46,7 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
 
     @Override
     public CompletableFuture<ResourceQuota> getDefaultResourceQuotaAsync() {
-        final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
-        asyncGetRequest(adminV2Quotas,
-                new InvocationCallback<ResourceQuota>() {
-                    @Override
-                    public void completed(ResourceQuota resourceQuota) {
-                        future.complete(resourceQuota);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(adminV2Quotas, new FutureCallback<ResourceQuota>(){});
     }
 
     @Override
@@ -82,20 +68,7 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
     public CompletableFuture<ResourceQuota> getNamespaceBundleResourceQuotaAsync(String namespace, String bundle) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, bundle);
-        final CompletableFuture<ResourceQuota> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<ResourceQuota>() {
-                    @Override
-                    public void completed(ResourceQuota resourceQuota) {
-                        future.complete(resourceQuota);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<ResourceQuota>(){});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index d24f2d982b3..65417ec9e72 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -60,20 +60,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
     @Override
     public CompletableFuture<SchemaInfo> getSchemaInfoAsync(String topic) {
         TopicName tn = TopicName.get(topic);
-        final CompletableFuture<SchemaInfo> future = new CompletableFuture<>();
-        asyncGetRequest(schemaPath(tn),
-                new InvocationCallback<GetSchemaResponse>() {
-                    @Override
-                    public void completed(GetSchemaResponse response) {
-                        future.complete(convertGetSchemaResponseToSchemaInfo(tn, response));
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(schemaPath(tn), new FutureCallback<GetSchemaResponse>(){})
+                .thenApply(response -> convertGetSchemaResponseToSchemaInfo(tn, response));
     }
 
     @Override
@@ -84,20 +72,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
     @Override
     public CompletableFuture<SchemaInfoWithVersion> getSchemaInfoWithVersionAsync(String topic) {
         TopicName tn = TopicName.get(topic);
-        final CompletableFuture<SchemaInfoWithVersion> future = new CompletableFuture<>();
-        asyncGetRequest(schemaPath(tn),
-                new InvocationCallback<GetSchemaResponse>() {
-                    @Override
-                    public void completed(GetSchemaResponse response) {
-                        future.complete(convertGetSchemaResponseToSchemaInfoWithVersion(tn, response));
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(schemaPath(tn), new FutureCallback<GetSchemaResponse>(){})
+                .thenApply(response -> convertGetSchemaResponseToSchemaInfoWithVersion(tn, response));
     }
 
     @Override
@@ -109,20 +85,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
     public CompletableFuture<SchemaInfo> getSchemaInfoAsync(String topic, long version) {
         TopicName tn = TopicName.get(topic);
         WebTarget path = schemaPath(tn).path(Long.toString(version));
-        final CompletableFuture<SchemaInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<GetSchemaResponse>() {
-                    @Override
-                    public void completed(GetSchemaResponse response) {
-                        future.complete(convertGetSchemaResponseToSchemaInfo(tn, response));
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<GetSchemaResponse>(){})
+                .thenApply(response -> convertGetSchemaResponseToSchemaInfo(tn, response));
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 74cecf6eab4..37fd2ba753b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.Properties;
@@ -47,20 +46,7 @@ public class TenantsImpl extends BaseResource implements Tenants, Properties {
 
     @Override
     public CompletableFuture<List<String>> getTenantsAsync() {
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(adminTenants,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> tenants) {
-                        future.complete(tenants);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(this.adminTenants, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -71,20 +57,8 @@ public class TenantsImpl extends BaseResource implements Tenants, Properties {
     @Override
     public CompletableFuture<TenantInfo> getTenantInfoAsync(String tenant) {
         WebTarget path = adminTenants.path(tenant);
-        final CompletableFuture<TenantInfo> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TenantInfoImpl>() {
-                    @Override
-                    public void completed(TenantInfoImpl tenantInfo) {
-                        future.complete(tenantInfo);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TenantInfoImpl>(){})
+                .thenApply(tenantInfo -> tenantInfo);
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 5cba329f4e0..29226567216 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -125,19 +124,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Integer>() {
-            @Override
-            public void completed(Integer maxNum) {
-                future.complete(maxNum);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -175,19 +162,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
-            @Override
-            public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
-                future.complete(inactiveTopicPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<InactiveTopicPolicies>(){});
     }
 
     @Override
@@ -239,19 +214,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         WebTarget path = topicPath(topicName, "delayedDelivery");
         path = path.queryParam("applied", applied);
 
-        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
-            @Override
-            public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
-                future.complete(delayedDeliveryPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DelayedDeliveryPolicies>(){});
     }
 
     @Override
@@ -310,19 +273,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "deduplicationEnabled");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Boolean>() {
-            @Override
-            public void completed(Boolean enabled) {
-                future.complete(enabled);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Boolean>(){});
     }
 
     private CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled) {
@@ -375,19 +326,8 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "offloadPolicies");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<OffloadPoliciesImpl>() {
-            @Override
-            public void completed(OffloadPoliciesImpl offloadPolicies) {
-                future.complete(offloadPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<OffloadPoliciesImpl>(){})
+                .thenApply(offloadPolicies -> offloadPolicies);
     }
 
     @Override
@@ -434,19 +374,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Integer>() {
-            @Override
-            public void completed(Integer maxNum) {
-                future.complete(maxNum);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -545,20 +473,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "retention");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<RetentionPolicies>() {
-                    @Override
-                    public void completed(RetentionPolicies retentionPolicies) {
-                        future.complete(retentionPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<RetentionPolicies>(){});
     }
 
     @Override
@@ -605,20 +520,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "persistence");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PersistencePolicies>() {
-                    @Override
-                    public void completed(PersistencePolicies persistencePolicies) {
-                        future.complete(persistencePolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PersistencePolicies>(){});
     }
 
     @Override
@@ -643,20 +545,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "dispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -703,20 +592,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -779,20 +655,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, subscriptionName, "dispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -837,20 +700,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "compactionThreshold");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Long> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Long>() {
-                    @Override
-                    public void completed(Long compactionThreshold) {
-                        future.complete(compactionThreshold);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Long>(){});
     }
 
     @Override
@@ -886,20 +736,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "publishRate");
-        final CompletableFuture<PublishRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PublishRate>() {
-                    @Override
-                    public void completed(PublishRate publishRate) {
-                        future.complete(publishRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PublishRate>(){});
     }
 
     @Override
@@ -935,20 +772,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxConsumersPerSubscription) {
-                        future.complete(maxConsumersPerSubscription);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -996,20 +820,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxProducers");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxProducers) {
-                        future.complete(maxProducers);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1045,20 +856,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxSubscriptionsPerTopic) {
-                        future.complete(maxSubscriptionsPerTopic);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1094,20 +892,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxMessageSize");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxMessageSize) {
-                        future.complete(maxMessageSize);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1154,20 +939,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxConsumers");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxProducers) {
-                        future.complete(maxProducers);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1204,20 +976,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<Integer> getDeduplicationSnapshotIntervalAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "deduplicationSnapshotInterval");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer interval) {
-                        future.complete(interval);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1268,20 +1027,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
     public CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
-        final CompletableFuture<Set<SubscriptionType>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Set<SubscriptionType>>() {
-                    @Override
-                    public void completed(Set<SubscriptionType> subscriptionTypesEnabled) {
-                        future.complete(subscriptionTypesEnabled);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Set<SubscriptionType>>(){});
     }
 
     @Override
@@ -1316,20 +1062,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscribeRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SubscribeRate>() {
-                    @Override
-                    public void completed(SubscribeRate subscribeRate) {
-                        future.complete(subscribeRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<SubscribeRate>(){});
     }
 
     @Override
@@ -1376,20 +1109,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "replicatorDispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -1428,21 +1148,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "schemaCompatibilityStrategy");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<SchemaCompatibilityStrategy> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SchemaCompatibilityStrategy>() {
-                    @Override
-                    public void completed(SchemaCompatibilityStrategy schemaCompatibilityStrategy) {
-                        future.complete(schemaCompatibilityStrategy);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-
-        return future;
+        return asyncGetRequest(path, new FutureCallback<SchemaCompatibilityStrategy>(){});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0b3f4ea7c7a..01ac1c85a89 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -298,21 +298,8 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
         NamespaceName ns = NamespaceName.get(namespace);
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
         WebTarget path = namespacePath("non-persistent", ns, bundleRange);
-
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-                    @Override
-                    public void completed(List<String> response) {
-                        future.complete(response);
-                    }
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
 
@@ -325,20 +312,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String topic) {
         TopicName tn = TopicName.get(topic);
         WebTarget path = topicPath(tn, "permissions");
-        final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, Set<AuthAction>>>() {
-                    @Override
-                    public void completed(Map<String, Set<AuthAction>> permissions) {
-                        future.complete(permissions);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, Set<AuthAction>>>(){});
     }
 
     @Override
@@ -467,21 +441,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitions");
-        final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PartitionedTopicMetadata>() {
-
-                    @Override
-                    public void completed(PartitionedTopicMetadata response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PartitionedTopicMetadata>(){});
     }
 
     @Override
@@ -493,21 +453,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "properties");
-        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, String>>() {
-
-                    @Override
-                    public void completed(Map<String, String> response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
     }
 
     @Override
@@ -653,21 +599,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<List<String>> getSubscriptionsAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "subscriptions");
-        final CompletableFuture<List<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<List<String>>() {
-
-                    @Override
-                    public void completed(List<String> response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<List<String>>(){});
     }
 
     @Override
@@ -743,21 +675,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "internalStats");
         path = path.queryParam("metadata", metadata);
-        final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PersistentTopicInternalStats>() {
-
-                    @Override
-                    public void completed(PersistentTopicInternalStats response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PersistentTopicInternalStats>(){});
     }
 
     @Override
@@ -769,20 +687,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<String> getInternalInfoAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "internal-info");
-        final CompletableFuture<String> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<String>() {
-                    @Override
-                    public void completed(String response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<String>(){});
     }
 
     @Override
@@ -856,21 +761,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitioned-internalStats");
-        final CompletableFuture<PartitionedTopicInternalStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PartitionedTopicInternalStats>() {
-
-                    @Override
-                    public void completed(PartitionedTopicInternalStats response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PartitionedTopicInternalStats>(){});
     }
 
     @Override
@@ -1137,20 +1028,8 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<MessageId> getMessageIdByTimestampAsync(String topic, long timestamp) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "messageid", Long.toString(timestamp));
-        final CompletableFuture<MessageId> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<MessageIdImpl>() {
-                    @Override
-                    public void completed(MessageIdImpl response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<MessageIdImpl>(){})
+                .thenApply(messageIdImpl -> messageIdImpl);
     }
 
 
@@ -1243,21 +1122,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         String encodedSubName = Codec.encode(subName);
         WebTarget path = topicPath(tn, "subscription", encodedSubName,
                 "properties");
-        final CompletableFuture<Map<String, String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, String>>() {
-
-                    @Override
-                    public void completed(Map<String, String> response) {
-                        future.complete(response);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
     }
 
     @Override
@@ -1304,20 +1169,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<LongRunningProcessStatus> compactionStatusAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "compaction");
-        final CompletableFuture<LongRunningProcessStatus> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<LongRunningProcessStatus>() {
-                    @Override
-                    public void completed(LongRunningProcessStatus longRunningProcessStatus) {
-                        future.complete(longRunningProcessStatus);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<LongRunningProcessStatus>(){});
     }
 
     @Override
@@ -1359,20 +1211,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<OffloadProcessStatus> offloadStatusAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "offload");
-        final CompletableFuture<OffloadProcessStatus> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<OffloadProcessStatus>() {
-                    @Override
-                    public void completed(OffloadProcessStatus offloadProcessStatus) {
-                        future.complete(offloadProcessStatus);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<OffloadProcessStatus>(){});
     }
 
     private WebTarget namespacePath(String domain, NamespaceName namespace, String... parts) {
@@ -1782,19 +1621,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Integer>() {
-            @Override
-            public void completed(Integer maxNum) {
-                future.complete(maxNum);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -1831,19 +1658,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
-            @Override
-            public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
-                future.complete(inactiveTopicPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<InactiveTopicPolicies>(){});
     }
 
     @Override
@@ -1894,19 +1709,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "delayedDelivery");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
-            @Override
-            public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
-                future.complete(delayedDeliveryPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DelayedDeliveryPolicies>(){});
     }
 
     @Override
@@ -1954,19 +1757,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "deduplicationEnabled");
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Boolean>() {
-            @Override
-            public void completed(Boolean enabled) {
-                future.complete(enabled);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Boolean>(){});
     }
 
     @Override
@@ -1989,19 +1780,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "deduplicationEnabled");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Boolean>() {
-            @Override
-            public void completed(Boolean enabled) {
-                future.complete(enabled);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Boolean>(){});
     }
 
     @Override
@@ -2072,19 +1851,8 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "offloadPolicies");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<OffloadPoliciesImpl>() {
-            @Override
-            public void completed(OffloadPoliciesImpl offloadPolicies) {
-                future.complete(offloadPolicies);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<OffloadPoliciesImpl>(){})
+                .thenApply(offloadPolicies -> offloadPolicies);
     }
 
     @Override
@@ -2132,19 +1900,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path, new InvocationCallback<Integer>() {
-            @Override
-            public void completed(Integer maxNum) {
-                future.complete(maxNum);
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(getApiException(throwable.getCause()));
-            }
-        });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2243,20 +1999,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "retention");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<RetentionPolicies>() {
-                    @Override
-                    public void completed(RetentionPolicies retentionPolicies) {
-                        future.complete(retentionPolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<RetentionPolicies>(){});
     }
 
     @Override
@@ -2303,20 +2046,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "persistence");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PersistencePolicies>() {
-                    @Override
-                    public void completed(PersistencePolicies persistencePolicies) {
-                        future.complete(persistencePolicies);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PersistencePolicies>(){});
     }
 
     @Override
@@ -2341,20 +2071,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "dispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -2401,20 +2118,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -2471,20 +2175,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "compactionThreshold");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Long> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-            new InvocationCallback<Long>() {
-                @Override
-                public void completed(Long compactionThreshold) {
-                  future.complete(compactionThreshold);
-                }
-
-                @Override
-                public void failed(Throwable throwable) {
-                  future.completeExceptionally(getApiException(throwable.getCause()));
-                }
-            });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Long>(){});
     }
 
     @Override
@@ -2520,20 +2211,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "publishRate");
-        final CompletableFuture<PublishRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-            new InvocationCallback<PublishRate>() {
-                @Override
-                public void completed(PublishRate publishRate) {
-                    future.complete(publishRate);
-                }
-
-                @Override
-                public void failed(Throwable throwable) {
-                    future.completeExceptionally(getApiException(throwable.getCause()));
-                }
-            });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PublishRate>(){});
     }
 
     @Override
@@ -2569,20 +2247,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxConsumersPerSubscription) {
-                        future.complete(maxConsumersPerSubscription);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2630,20 +2295,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxProducers");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxProducers) {
-                        future.complete(maxProducers);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2679,20 +2331,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxSubscriptionsPerTopic) {
-                        future.complete(maxSubscriptionsPerTopic);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2728,20 +2367,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxMessageSize");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxMessageSize) {
-                        future.complete(maxMessageSize);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2788,20 +2414,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "maxConsumers");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer maxProducers) {
-                        future.complete(maxProducers);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2838,20 +2451,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Integer> getDeduplicationSnapshotIntervalAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "deduplicationSnapshotInterval");
-        final CompletableFuture<Integer> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Integer>() {
-                    @Override
-                    public void completed(Integer interval) {
-                        future.complete(interval);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Integer>(){});
     }
 
     @Override
@@ -2902,20 +2502,7 @@ public class TopicsImpl extends BaseResource implements Topics {
     public CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String topic) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
-        final CompletableFuture<Set<SubscriptionType>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Set<SubscriptionType>>() {
-                    @Override
-                    public void completed(Set<SubscriptionType> subscriptionTypesEnabled) {
-                        future.complete(subscriptionTypesEnabled);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Set<SubscriptionType>>(){});
     }
 
     @Override
@@ -2950,20 +2537,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "replicatorDispatchRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<DispatchRate>() {
-                    @Override
-                    public void completed(DispatchRate dispatchRate) {
-                        future.complete(dispatchRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<DispatchRate>(){});
     }
 
     @Override
@@ -3010,20 +2584,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "subscribeRate");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<SubscribeRate>() {
-                    @Override
-                    public void completed(SubscribeRate subscribeRate) {
-                        future.complete(subscribeRate);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<SubscribeRate>(){});
     }
 
     @Override
@@ -3073,20 +2634,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName topicName = validateTopic(topic);
         String encodedSubName = Codec.encode(subName);
         WebTarget path = topicPath(topicName, "subscription", encodedSubName, "replicatedSubscriptionStatus");
-        final CompletableFuture<Map<String, Boolean>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, Boolean>>() {
-                    @Override
-                    public void completed(Map<String, Boolean> subscriptionStatus) {
-                        future.complete(subscriptionStatus);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, Boolean>>(){});
     }
 
     @Override
@@ -3104,20 +2652,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "schemaValidationEnforced");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Boolean> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Boolean>() {
-                    @Override
-                    public void completed(Boolean enforced) {
-                        future.complete(enforced);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Boolean>(){});
     }
 
     @Override
@@ -3137,20 +2672,7 @@ public class TopicsImpl extends BaseResource implements Topics {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "replication");
         path = path.queryParam("applied", applied);
-        final CompletableFuture<Set<String>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Set<String>>() {
-                    @Override
-                    public void completed(Set<String> clusterIds) {
-                        future.complete(clusterIds);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Set<String>>(){});
     }
 
     @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 80f41c3e2d5..6af80344287 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -53,20 +52,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
     public CompletableFuture<TransactionCoordinatorStats> getCoordinatorStatsByIdAsync(int coordinatorId) {
         WebTarget path = adminV3Transactions.path("coordinatorStats");
         path = path.queryParam("coordinatorId", coordinatorId);
-        final CompletableFuture<TransactionCoordinatorStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionCoordinatorStats>() {
-                    @Override
-                    public void completed(TransactionCoordinatorStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionCoordinatorStats>(){});
     }
 
     @Override
@@ -77,20 +63,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
     @Override
     public CompletableFuture<Map<Integer, TransactionCoordinatorStats>> getCoordinatorStatsAsync() {
         WebTarget path = adminV3Transactions.path("coordinatorStats");
-        final CompletableFuture<Map<Integer, TransactionCoordinatorStats>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<Integer, TransactionCoordinatorStats>>() {
-                    @Override
-                    public void completed(Map<Integer, TransactionCoordinatorStats> stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<Integer, TransactionCoordinatorStats>>(){});
     }
 
     @Override
@@ -105,20 +78,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         path = path.path(topicName.getRestPath(false));
         path = path.path(txnID.getMostSigBits() + "");
         path = path.path(txnID.getLeastSigBits() + "");
-        final CompletableFuture<TransactionInBufferStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionInBufferStats>() {
-                    @Override
-                    public void completed(TransactionInBufferStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionInBufferStats>(){});
     }
 
     @Override
@@ -135,20 +95,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         path = path.path(subName);
         path = path.path(txnID.getMostSigBits() + "");
         path = path.path(txnID.getLeastSigBits() + "");
-        final CompletableFuture<TransactionInPendingAckStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionInPendingAckStats>() {
-                    @Override
-                    public void completed(TransactionInPendingAckStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionInPendingAckStats>(){});
     }
 
     @Override
@@ -162,20 +109,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         WebTarget path = adminV3Transactions.path("transactionMetadata");
         path = path.path(txnID.getMostSigBits() + "");
         path = path.path(txnID.getLeastSigBits() + "");
-        final CompletableFuture<TransactionMetadata> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionMetadata>() {
-                    @Override
-                    public void completed(TransactionMetadata metadata) {
-                        future.complete(metadata);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionMetadata>(){});
     }
 
     @Override
@@ -189,20 +123,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         WebTarget path = adminV3Transactions.path("transactionBufferStats");
         path = path.path(TopicName.get(topic).getRestPath(false));
         path = path.queryParam("lowWaterMarks", lowWaterMarks);
-        final CompletableFuture<TransactionBufferStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionBufferStats>() {
-                    @Override
-                    public void completed(TransactionBufferStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionBufferStats>(){});
     }
 
     @Override
@@ -218,20 +139,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         path = path.path(TopicName.get(topic).getRestPath(false));
         path = path.path(subName);
         path = path.queryParam("lowWaterMarks", lowWaterMarks);
-        final CompletableFuture<TransactionPendingAckStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionPendingAckStats>() {
-                    @Override
-                    public void completed(TransactionPendingAckStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionPendingAckStats>(){});
     }
 
     @Override
@@ -248,20 +156,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         if (coordinatorId != null) {
             path = path.queryParam("coordinatorId", coordinatorId);
         }
-        final CompletableFuture<Map<String, TransactionMetadata>> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<Map<String, TransactionMetadata>>() {
-                    @Override
-                    public void completed(Map<String, TransactionMetadata> metadataMap) {
-                        future.complete(metadataMap);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<Map<String, TransactionMetadata>>(){});
     }
 
     @Override
@@ -290,20 +185,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         WebTarget path = adminV3Transactions.path("coordinatorInternalStats");
         path = path.path(coordinatorId + "");
         path = path.queryParam("metadata", metadata);
-        final CompletableFuture<TransactionCoordinatorInternalStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionCoordinatorInternalStats>() {
-                    @Override
-                    public void completed(TransactionCoordinatorInternalStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionCoordinatorInternalStats>(){});
     }
 
     @Override
@@ -322,20 +204,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         path = path.path(tn.getRestPath(false));
         path = path.path(subName);
         path = path.queryParam("metadata", metadata);
-        final CompletableFuture<TransactionPendingAckInternalStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TransactionPendingAckInternalStats>() {
-                    @Override
-                    public void completed(TransactionPendingAckInternalStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<TransactionPendingAckInternalStats>(){});
     }
 
     @Override
@@ -371,20 +240,7 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         path = path.path(ledgerId.toString());
         path = path.path(entryId.toString());
         path = path.queryParam("batchIndex", batchIndex);
-        final CompletableFuture<PositionInPendingAckStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PositionInPendingAckStats>() {
-                    @Override
-                    public void completed(PositionInPendingAckStats stats) {
-                        future.complete(stats);
-                    }
-
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
-        return future;
+        return asyncGetRequest(path, new FutureCallback<PositionInPendingAckStats>() {});
     }
 
 


[pulsar] 04/05: [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 433e3aff9b2cbdf690ee9de07dad12e5e738ac50
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 10:45:48 2022 +0800

    [Fix][Flaky-test] Fix testConsumeTxnMessage (#16981)
    
    * [Fix][Flaky-test] Fix testConsumeTxnMessage
    Master https://github.com/apache/pulsar/issues/14109
    ## Motivation
    The transaction commit is async, so the consumer can still receive message when the consumer rebuilds.
    ## Modification
    Add  Awaitility.await() for check-ing whether the ongoingTxns = 0.
---
 .../apache/pulsar/testclient/PerformanceTransactionTest.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index b87cf3adda3..883d53540cb 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -196,7 +197,7 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException {
+    public void testConsumeTxnMessage() throws Exception {
         String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
@@ -224,11 +225,18 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
         });
         thread.start();
         thread.join();
+
+        Awaitility.await().untilAsserted(() -> {
+            admin.transactions().getCoordinatorStats().forEach((integer, transactionCoordinatorStats) -> {
+                Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
+            });
+        });
+
         Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .enableBatchIndexAcknowledgment(false)
-               .subscribe();
+                .subscribe();
         for (int i = 0; i < 5; i++) {
             Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
             Assert.assertNotNull(message);


[pulsar] 03/05: [improve][broker]Remove unnecessary lock on the stats thread (#16983)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 643963a60fa1019180bf16d74e9ee7e0f4ef4939
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 9 10:57:20 2022 +0800

    [improve][broker]Remove unnecessary lock on the stats thread  (#16983)
    
    ---
    
    *Motivation*
    
    We found there has a block between the pulsar-ordered executor and
    the pulsar-stats-updater executor.
    
    The pulsar-ordered executor is trying to createManagedLedgerOffloader,
    and the pulsar-stats-updater is getting the compactor. Both them want
    to get the lock.
    
    We have an improvement about the `createManagedLedgerOffloader` before.
    https://github.com/apache/pulsar/pull/15883
    
    We are using `getCompactor(false)` for the stats related operations.
    The `getCompactor` is guarded by `synchronized`. Actually, the stats
    just want to get the current compactor without initializing it. We
    don't need to use `synchronized` to guard this operation.
    
    *Modification*
    
    Remove unnecessary `synchronized` on the `getCompactor` method.
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     |  9 +++------
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  7 +------
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   |  8 +-------
 .../apache/pulsar/broker/stats/PrometheusMetricsTest.java   |  2 +-
 5 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b5baa717093..6e9a9442a6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1419,16 +1419,19 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        return getCompactor(true);
-    }
-
-    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
-        if (this.compactor == null && shouldInitialize) {
+        if (this.compactor == null) {
             this.compactor = newCompactor();
         }
         return this.compactor;
     }
 
+    // This method is used for metrics, which is allowed to as null
+    // Because it's no operation on the compactor, so let's remove the  synchronized on this method
+    // to avoid unnecessary lock competition.
+    public Compactor getNullableCompactor() {
+        return this.compactor;
+    }
+
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0a2e99004a8..e99fce6f68a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1990,12 +1990,9 @@ public class BrokerService implements Closeable {
         }
         topics.remove(topic);
 
-        try {
-            Compactor compactor = pulsar.getCompactor(false);
-            if (compactor != null) {
-                compactor.getStats().removeTopic(topic);
-            }
-        } catch (PulsarServerException ignore) {
+        Compactor compactor = pulsar.getNullableCompactor();
+        if (compactor != null) {
+            compactor.getStats().removeTopic(topic);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00e4bc01ec4..6673bedce23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1945,12 +1945,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
-        Compactor compactor = null;
-        try {
-            compactor = brokerService.pulsar().getCompactor(false);
-        } catch (PulsarServerException ex) {
-            log.warn("get compactor error", ex);
-        }
+        Compactor compactor = brokerService.pulsar().getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index c3e8567de54..f444ad0542e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -103,12 +102,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
-        Compactor compactor = null;
-        try {
-            compactor = pulsar.getCompactor(false);
-        } catch (PulsarServerException e) {
-            log.error("get compactor error", e);
-        }
+        Compactor compactor = pulsar.getNullableCompactor();
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index d9d1b25665c..c5ecb8d5bf6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1417,7 +1417,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                     .value(data)
                     .send();
         }
-        Compactor compactor = pulsar.getCompactor(true);
+        Compactor compactor = pulsar.getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);