You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by nk...@apache.org on 2019/06/13 09:17:56 UTC

[pulsar] branch master updated: [pulsar-admin] add read-timeout to blocked admin-request (#4484)

This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 46a068b  [pulsar-admin] add read-timeout to blocked admin-request (#4484)
46a068b is described below

commit 46a068be5de3f062b2344a393846a761e4777f1a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 13 02:17:50 2019 -0700

    [pulsar-admin] add read-timeout to blocked admin-request (#4484)
    
    * [pulsar-admin] add read-timeout to blocked admin-request
    
    * fix: pass readTimeout from broker + consistent naming
---
 .../org/apache/pulsar/broker/PulsarService.java    |  6 ++-
 .../apache/pulsar/client/admin/PulsarAdmin.java    | 33 +++++++-------
 .../pulsar/client/admin/PulsarAdminException.java  |  5 +++
 .../pulsar/client/admin/internal/BaseResource.java |  4 +-
 .../pulsar/client/admin/internal/BookiesImpl.java  |  4 +-
 .../client/admin/internal/BrokerStatsImpl.java     |  4 +-
 .../pulsar/client/admin/internal/BrokersImpl.java  |  4 +-
 .../pulsar/client/admin/internal/ClustersImpl.java |  4 +-
 .../client/admin/internal/ComponentResource.java   |  4 +-
 .../client/admin/internal/FunctionsImpl.java       |  4 +-
 .../pulsar/client/admin/internal/LookupImpl.java   |  4 +-
 .../client/admin/internal/NamespacesImpl.java      |  4 +-
 .../admin/internal/NonPersistentTopicsImpl.java    |  4 +-
 .../client/admin/internal/ResourceQuotasImpl.java  |  4 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  |  4 +-
 .../pulsar/client/admin/internal/SinksImpl.java    |  4 +-
 .../pulsar/client/admin/internal/SourcesImpl.java  |  4 +-
 .../pulsar/client/admin/internal/TenantsImpl.java  |  4 +-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 50 ++++++++++++++++------
 .../pulsar/client/admin/internal/WorkerImpl.java   |  4 +-
 20 files changed, 97 insertions(+), 61 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 098d52d..f4c258a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -837,9 +837,13 @@ public class PulsarService implements AutoCloseable {
                     builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                     builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
                 }
+                
+                // most of the admin request requires to make zk-call so, keep the max read-timeout based on
+                // zk-operation timeout
+                builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
 
                 this.adminClient = builder.build();
-                LOG.info("Admin api url: " + adminApiUrl);
+                LOG.info("created admin with url {} ", adminApiUrl);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
             }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index ce86f89..1d96744 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -187,22 +187,23 @@ public class PulsarAdmin implements Closeable {
                 Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
                 Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient();
 
-        this.clusters = new ClustersImpl(root, auth);
-        this.brokers = new BrokersImpl(root, auth);
-        this.brokerStats = new BrokerStatsImpl(root, auth);
-        this.tenants = new TenantsImpl(root, auth);
-        this.properties = new TenantsImpl(root, auth);;
-        this.namespaces = new NamespacesImpl(root, auth);
-        this.topics = new TopicsImpl(root, auth);
-        this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
-        this.resourceQuotas = new ResourceQuotasImpl(root, auth);
-        this.lookups = new LookupImpl(root, auth, useTls);
-        this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
-        this.sources = new SourcesImpl(root, auth, httpAsyncClient);
-        this.sinks = new SinksImpl(root, auth, httpAsyncClient);
-        this.worker = new WorkerImpl(root, auth);
-        this.schemas = new SchemasImpl(root, auth);
-        this.bookies = new BookiesImpl(root, auth);
+        long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
+        this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
+        this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
+        this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
+        this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
+        this.properties = new TenantsImpl(root, auth, readTimeoutMs);
+        this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
+        this.topics = new TopicsImpl(root, auth, readTimeoutMs);
+        this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, readTimeoutMs);
+        this.resourceQuotas = new ResourceQuotasImpl(root, auth, readTimeoutMs);
+        this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs);
+        this.functions = new FunctionsImpl(root, auth, httpAsyncClient, readTimeoutMs);
+        this.sources = new SourcesImpl(root, auth, httpAsyncClient, readTimeoutMs);
+        this.sinks = new SinksImpl(root, auth, httpAsyncClient, readTimeoutMs);
+        this.worker = new WorkerImpl(root, auth, readTimeoutMs);
+        this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
+        this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
     }
 
     /**
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 68f7c8d..cefbbdf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -136,6 +136,11 @@ public class PulsarAdminException extends Exception {
             super(e);
         }
     }
+    public static class TimeoutException extends PulsarAdminException {
+        public TimeoutException(Throwable t) {
+            super(t);
+        }
+    }
 
     public static class ServerSideErrorException extends PulsarAdminException {
         public ServerSideErrorException(ServerErrorException e, String msg) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 2c13985..c654717 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -56,9 +56,11 @@ public abstract class BaseResource {
     private static final Logger log = LoggerFactory.getLogger(BaseResource.class);
 
     protected final Authentication auth;
+    protected final long readTimeoutMs;
 
-    protected BaseResource(Authentication auth) {
+    protected BaseResource(Authentication auth, long readTimeoutMs) {
         this.auth = auth;
+        this.readTimeoutMs = readTimeoutMs;
     }
 
     public Builder request(final WebTarget target) throws PulsarAdminException {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
index 7680ce9..d66b489 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 public class BookiesImpl extends BaseResource implements Bookies {
     private final WebTarget adminBookies;
 
-    public BookiesImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminBookies = web.path("/admin/v2/bookies");
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
index 257adb4..2940c34 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
@@ -42,8 +42,8 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
     private final WebTarget adminBrokerStats;
     private final WebTarget adminV2BrokerStats;
 
-    public BrokerStatsImpl(WebTarget target, Authentication auth) {
-        super(auth);
+    public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminBrokerStats = target.path("/admin/broker-stats");
         adminV2BrokerStats = target.path("/admin/v2/broker-stats");
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 88077b2..4fe345f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -35,8 +35,8 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 public class BrokersImpl extends BaseResource implements Brokers {
     private final WebTarget adminBrokers;
 
-    public BrokersImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminBrokers = web.path("/admin/v2/brokers");
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index e3041ba..fcc598e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -41,8 +41,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
 
     private final WebTarget adminClusters;
 
-    public ClustersImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public ClustersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminClusters = web.path("/admin/v2/clusters");
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
index 5d91f81..43e0882 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -33,8 +33,8 @@ import javax.ws.rs.client.WebTarget;
 
 public class ComponentResource extends BaseResource {
 
-    protected ComponentResource(Authentication auth) {
-        super(auth);
+    protected ComponentResource(Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
     }
 
     public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 28cfabb..ba7d7f2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -69,8 +69,8 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     private final WebTarget functions;
     private final AsyncHttpClient asyncHttpClient;
 
-    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
-        super(auth);
+    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.functions = web.path("/admin/v3/functions");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index b1588f1..8d665a2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -31,8 +31,8 @@ public class LookupImpl extends BaseResource implements Lookup {
     private final WebTarget v2lookup;
     private final boolean useTls;
 
-    public LookupImpl(WebTarget web, Authentication auth, boolean useTls) {
-        super(auth);
+    public LookupImpl(WebTarget web, Authentication auth, boolean useTls, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.useTls = useTls;
         v2lookup = web.path("/lookup/v2");
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 19368d9..c6540f6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -53,8 +53,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     private final WebTarget adminNamespaces;
     private final WebTarget adminV2Namespaces;
 
-    public NamespacesImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public NamespacesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminNamespaces = web.path("/admin/namespaces");
         adminV2Namespaces = web.path("/admin/v2/namespaces");
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 78f7ac3..8e81473 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -43,8 +43,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
     private final WebTarget adminNonPersistentTopics;
     private final WebTarget adminV2NonPersistentTopics;
 
-    public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminNonPersistentTopics = web.path("/admin");
         adminV2NonPersistentTopics = web.path("/admin/v2");
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index d96b643..54d89d3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -34,8 +34,8 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
     private final WebTarget adminQuotas;
     private final WebTarget adminV2Quotas;
 
-    public ResourceQuotasImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public ResourceQuotasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminQuotas = web.path("/admin/resource-quotas");
         adminV2Quotas = web.path("/admin/v2/resource-quotas");
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index df2e62e..9d06676 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -34,8 +34,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
 
     private final WebTarget target;
 
-    public SchemasImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.target = web.path("/admin/v2/schemas");
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 77a25cc..d62034d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -54,8 +54,8 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
     private final WebTarget sink;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
-        super(auth);
+    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.sink = web.path("/admin/v3/sink");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index d8f57a3..4dfcf44 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -54,8 +54,8 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
     private final WebTarget source;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
-        super(auth);
+    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.source = web.path("/admin/v3/source");
         this.asyncHttpClient = asyncHttpClient;
     }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index b4aac1f..6a0d1aa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -36,8 +36,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 public class TenantsImpl extends BaseResource implements Tenants, Properties {
     private final WebTarget adminTenants;
 
-    public TenantsImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminTenants = web.path("/admin/v2/tenants");
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index dfd90bd..26c6218 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -37,6 +37,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -81,8 +83,8 @@ public class TopicsImpl extends BaseResource implements Topics {
     private final WebTarget adminTopics;
     private final WebTarget adminV2Topics;
     private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
-    public TopicsImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         adminTopics = web.path("/admin");
         adminV2Topics = web.path("/admin/v2");
     }
@@ -389,12 +391,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public TopicStats getStats(String topic) throws PulsarAdminException {
         try {
-            return getStatsAsync(topic).get();
+            return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -422,12 +426,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
         try {
-            return getInternalStatsAsync(topic).get();
+            return getInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -455,12 +461,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public JsonObject getInternalInfo(String topic) throws PulsarAdminException {
         try {
-            return getInternalInfoAsync(topic).get();
+            return getInternalInfoAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -489,12 +497,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition)
             throws PulsarAdminException {
         try {
-            return getPartitionedStatsAsync(topic, perPartition).get();
+            return getPartitionedStatsAsync(topic, perPartition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -527,12 +537,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     public PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
             throws PulsarAdminException {
         try {
-            return getPartitionedInternalStatsAsync(topic).get();
+            return getPartitionedInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -560,12 +572,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
         try {
-            deleteSubscriptionAsync(topic, subName).get();
+            deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -600,12 +614,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public void skipMessages(String topic, String subName, long numMessages) throws PulsarAdminException {
         try {
-            skipMessagesAsync(topic, subName, numMessages).get();
+            skipMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -620,12 +636,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public void expireMessages(String topic, String subName, long expireTimeInSeconds) throws PulsarAdminException {
         try {
-            expireMessagesAsync(topic, subName, expireTimeInSeconds).get();
+            expireMessagesAsync(topic, subName, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -641,12 +659,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
         try {
-            expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get();
+            expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -688,12 +708,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     public List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
             throws PulsarAdminException {
         try {
-            return peekMessagesAsync(topic, subName, numMessages).get();
+            return peekMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -964,12 +986,14 @@ public class TopicsImpl extends BaseResource implements Topics {
     @Override
     public MessageId getLastMessageId(String topic) throws PulsarAdminException {
         try {
-            return getLastMessageIdAsync(topic).get();
+            return getLastMessageIdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 3df818a..3dbd66a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -39,8 +39,8 @@ public class WorkerImpl extends BaseResource implements Worker {
     private final WebTarget workerStats;
     private final WebTarget worker;
 
-    public WorkerImpl(WebTarget web, Authentication auth) {
-        super(auth);
+    public WorkerImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
         this.worker = web.path("/admin/v2/worker");
         this.workerStats = web.path("/admin/v2/worker-stats");
     }