You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by nk...@apache.org on 2019/06/13 09:17:56 UTC
[pulsar] branch master updated: [pulsar-admin] add read-timeout to
blocked admin-request (#4484)
This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46a068b [pulsar-admin] add read-timeout to blocked admin-request (#4484)
46a068b is described below
commit 46a068be5de3f062b2344a393846a761e4777f1a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 13 02:17:50 2019 -0700
[pulsar-admin] add read-timeout to blocked admin-request (#4484)
* [pulsar-admin] add read-timeout to blocked admin-request
* fix: pass readTimeout from broker + consistent naming
---
.../org/apache/pulsar/broker/PulsarService.java | 6 ++-
.../apache/pulsar/client/admin/PulsarAdmin.java | 33 +++++++-------
.../pulsar/client/admin/PulsarAdminException.java | 5 +++
.../pulsar/client/admin/internal/BaseResource.java | 4 +-
.../pulsar/client/admin/internal/BookiesImpl.java | 4 +-
.../client/admin/internal/BrokerStatsImpl.java | 4 +-
.../pulsar/client/admin/internal/BrokersImpl.java | 4 +-
.../pulsar/client/admin/internal/ClustersImpl.java | 4 +-
.../client/admin/internal/ComponentResource.java | 4 +-
.../client/admin/internal/FunctionsImpl.java | 4 +-
.../pulsar/client/admin/internal/LookupImpl.java | 4 +-
.../client/admin/internal/NamespacesImpl.java | 4 +-
.../admin/internal/NonPersistentTopicsImpl.java | 4 +-
.../client/admin/internal/ResourceQuotasImpl.java | 4 +-
.../pulsar/client/admin/internal/SchemasImpl.java | 4 +-
.../pulsar/client/admin/internal/SinksImpl.java | 4 +-
.../pulsar/client/admin/internal/SourcesImpl.java | 4 +-
.../pulsar/client/admin/internal/TenantsImpl.java | 4 +-
.../pulsar/client/admin/internal/TopicsImpl.java | 50 ++++++++++++++++------
.../pulsar/client/admin/internal/WorkerImpl.java | 4 +-
20 files changed, 97 insertions(+), 61 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 098d52d..f4c258a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -837,9 +837,13 @@ public class PulsarService implements AutoCloseable {
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}
+
+ // most of the admin request requires to make zk-call so, keep the max read-timeout based on
+ // zk-operation timeout
+ builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.adminClient = builder.build();
- LOG.info("Admin api url: " + adminApiUrl);
+ LOG.info("created admin with url {} ", adminApiUrl);
} catch (Exception e) {
throw new PulsarServerException(e);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index ce86f89..1d96744 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -187,22 +187,23 @@ public class PulsarAdmin implements Closeable {
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))).getHttpClient();
- this.clusters = new ClustersImpl(root, auth);
- this.brokers = new BrokersImpl(root, auth);
- this.brokerStats = new BrokerStatsImpl(root, auth);
- this.tenants = new TenantsImpl(root, auth);
- this.properties = new TenantsImpl(root, auth);;
- this.namespaces = new NamespacesImpl(root, auth);
- this.topics = new TopicsImpl(root, auth);
- this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
- this.resourceQuotas = new ResourceQuotasImpl(root, auth);
- this.lookups = new LookupImpl(root, auth, useTls);
- this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
- this.sources = new SourcesImpl(root, auth, httpAsyncClient);
- this.sinks = new SinksImpl(root, auth, httpAsyncClient);
- this.worker = new WorkerImpl(root, auth);
- this.schemas = new SchemasImpl(root, auth);
- this.bookies = new BookiesImpl(root, auth);
+ long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
+ this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
+ this.brokers = new BrokersImpl(root, auth, readTimeoutMs);
+ this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
+ this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
+ this.properties = new TenantsImpl(root, auth, readTimeoutMs);
+ this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
+ this.topics = new TopicsImpl(root, auth, readTimeoutMs);
+ this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, readTimeoutMs);
+ this.resourceQuotas = new ResourceQuotasImpl(root, auth, readTimeoutMs);
+ this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs);
+ this.functions = new FunctionsImpl(root, auth, httpAsyncClient, readTimeoutMs);
+ this.sources = new SourcesImpl(root, auth, httpAsyncClient, readTimeoutMs);
+ this.sinks = new SinksImpl(root, auth, httpAsyncClient, readTimeoutMs);
+ this.worker = new WorkerImpl(root, auth, readTimeoutMs);
+ this.schemas = new SchemasImpl(root, auth, readTimeoutMs);
+ this.bookies = new BookiesImpl(root, auth, readTimeoutMs);
}
/**
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 68f7c8d..cefbbdf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -136,6 +136,11 @@ public class PulsarAdminException extends Exception {
super(e);
}
}
+ public static class TimeoutException extends PulsarAdminException {
+ public TimeoutException(Throwable t) {
+ super(t);
+ }
+ }
public static class ServerSideErrorException extends PulsarAdminException {
public ServerSideErrorException(ServerErrorException e, String msg) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 2c13985..c654717 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -56,9 +56,11 @@ public abstract class BaseResource {
private static final Logger log = LoggerFactory.getLogger(BaseResource.class);
protected final Authentication auth;
+ protected final long readTimeoutMs;
- protected BaseResource(Authentication auth) {
+ protected BaseResource(Authentication auth, long readTimeoutMs) {
this.auth = auth;
+ this.readTimeoutMs = readTimeoutMs;
}
public Builder request(final WebTarget target) throws PulsarAdminException {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
index 7680ce9..d66b489 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.ErrorData;
public class BookiesImpl extends BaseResource implements Bookies {
private final WebTarget adminBookies;
- public BookiesImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminBookies = web.path("/admin/v2/bookies");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
index 257adb4..2940c34 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java
@@ -42,8 +42,8 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats {
private final WebTarget adminBrokerStats;
private final WebTarget adminV2BrokerStats;
- public BrokerStatsImpl(WebTarget target, Authentication auth) {
- super(auth);
+ public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminBrokerStats = target.path("/admin/broker-stats");
adminV2BrokerStats = target.path("/admin/v2/broker-stats");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index 88077b2..4fe345f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -35,8 +35,8 @@ import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
public class BrokersImpl extends BaseResource implements Brokers {
private final WebTarget adminBrokers;
- public BrokersImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminBrokers = web.path("/admin/v2/brokers");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index e3041ba..fcc598e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -41,8 +41,8 @@ public class ClustersImpl extends BaseResource implements Clusters {
private final WebTarget adminClusters;
- public ClustersImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public ClustersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminClusters = web.path("/admin/v2/clusters");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
index 5d91f81..43e0882 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -33,8 +33,8 @@ import javax.ws.rs.client.WebTarget;
public class ComponentResource extends BaseResource {
- protected ComponentResource(Authentication auth) {
- super(auth);
+ protected ComponentResource(Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
}
public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 28cfabb..ba7d7f2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -69,8 +69,8 @@ public class FunctionsImpl extends ComponentResource implements Functions {
private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;
- public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
- super(auth);
+ public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
index b1588f1..8d665a2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/LookupImpl.java
@@ -31,8 +31,8 @@ public class LookupImpl extends BaseResource implements Lookup {
private final WebTarget v2lookup;
private final boolean useTls;
- public LookupImpl(WebTarget web, Authentication auth, boolean useTls) {
- super(auth);
+ public LookupImpl(WebTarget web, Authentication auth, boolean useTls, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.useTls = useTls;
v2lookup = web.path("/lookup/v2");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 19368d9..c6540f6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -53,8 +53,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
private final WebTarget adminNamespaces;
private final WebTarget adminV2Namespaces;
- public NamespacesImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public NamespacesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminNamespaces = web.path("/admin/namespaces");
adminV2Namespaces = web.path("/admin/v2/namespaces");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 78f7ac3..8e81473 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -43,8 +43,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
private final WebTarget adminNonPersistentTopics;
private final WebTarget adminV2NonPersistentTopics;
- public NonPersistentTopicsImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminNonPersistentTopics = web.path("/admin");
adminV2NonPersistentTopics = web.path("/admin/v2");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
index d96b643..54d89d3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java
@@ -34,8 +34,8 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas {
private final WebTarget adminQuotas;
private final WebTarget adminV2Quotas;
- public ResourceQuotasImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public ResourceQuotasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminQuotas = web.path("/admin/resource-quotas");
adminV2Quotas = web.path("/admin/v2/resource-quotas");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index df2e62e..9d06676 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -34,8 +34,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
private final WebTarget target;
- public SchemasImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.target = web.path("/admin/v2/schemas");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 77a25cc..d62034d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -54,8 +54,8 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
private final WebTarget sink;
private final AsyncHttpClient asyncHttpClient;
- public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
- super(auth);
+ public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.sink = web.path("/admin/v3/sink");
this.asyncHttpClient = asyncHttpClient;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index d8f57a3..4dfcf44 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -54,8 +54,8 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
private final WebTarget source;
private final AsyncHttpClient asyncHttpClient;
- public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
- super(auth);
+ public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.source = web.path("/admin/v3/source");
this.asyncHttpClient = asyncHttpClient;
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index b4aac1f..6a0d1aa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -36,8 +36,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
public class TenantsImpl extends BaseResource implements Tenants, Properties {
private final WebTarget adminTenants;
- public TenantsImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminTenants = web.path("/admin/v2/tenants");
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index dfd90bd..26c6218 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -37,6 +37,8 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -81,8 +83,8 @@ public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
- public TopicsImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
adminTopics = web.path("/admin");
adminV2Topics = web.path("/admin/v2");
}
@@ -389,12 +391,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public TopicStats getStats(String topic) throws PulsarAdminException {
try {
- return getStatsAsync(topic).get();
+ return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -422,12 +426,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
try {
- return getInternalStatsAsync(topic).get();
+ return getInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -455,12 +461,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public JsonObject getInternalInfo(String topic) throws PulsarAdminException {
try {
- return getInternalInfoAsync(topic).get();
+ return getInternalInfoAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -489,12 +497,14 @@ public class TopicsImpl extends BaseResource implements Topics {
public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition)
throws PulsarAdminException {
try {
- return getPartitionedStatsAsync(topic, perPartition).get();
+ return getPartitionedStatsAsync(topic, perPartition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -527,12 +537,14 @@ public class TopicsImpl extends BaseResource implements Topics {
public PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
throws PulsarAdminException {
try {
- return getPartitionedInternalStatsAsync(topic).get();
+ return getPartitionedInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -560,12 +572,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
try {
- deleteSubscriptionAsync(topic, subName).get();
+ deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -600,12 +614,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void skipMessages(String topic, String subName, long numMessages) throws PulsarAdminException {
try {
- skipMessagesAsync(topic, subName, numMessages).get();
+ skipMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -620,12 +636,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void expireMessages(String topic, String subName, long expireTimeInSeconds) throws PulsarAdminException {
try {
- expireMessagesAsync(topic, subName, expireTimeInSeconds).get();
+ expireMessagesAsync(topic, subName, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -641,12 +659,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
try {
- expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get();
+ expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -688,12 +708,14 @@ public class TopicsImpl extends BaseResource implements Topics {
public List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
throws PulsarAdminException {
try {
- return peekMessagesAsync(topic, subName, numMessages).get();
+ return peekMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -964,12 +986,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public MessageId getLastMessageId(String topic) throws PulsarAdminException {
try {
- return getLastMessageIdAsync(topic).get();
+ return getLastMessageIdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 3df818a..3dbd66a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -39,8 +39,8 @@ public class WorkerImpl extends BaseResource implements Worker {
private final WebTarget workerStats;
private final WebTarget worker;
- public WorkerImpl(WebTarget web, Authentication auth) {
- super(auth);
+ public WorkerImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
this.worker = web.path("/admin/v2/worker");
this.workerStats = web.path("/admin/v2/worker-stats");
}