You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/09 09:31:39 UTC
[pulsar] branch branch-2.7 updated: [Cherry-pick] Add backoff for
setting for getting topic policies to branch-2.7 (#11574)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 9bd54a9 [Cherry-pick] Add backoff for setting for getting topic policies to branch-2.7 (#11574)
9bd54a9 is described below
commit 9bd54a9397e95543defbcc435f5246c3a682f7ee
Author: GuoJiwei <te...@apache.org>
AuthorDate: Mon Aug 9 17:30:56 2021 +0800
[Cherry-pick] Add backoff for setting for getting topic policies to branch-2.7 (#11574)
---
.../apache/pulsar/broker/admin/AdminResource.java | 49 +-
.../broker/admin/impl/PersistentTopicsBase.java | 568 +++++++++++----------
.../pulsar/broker/admin/v2/PersistentTopics.java | 341 +++++++------
.../apache/pulsar/client/impl/BackoffBuilder.java | 3 +-
4 files changed, 503 insertions(+), 458 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3e26aa7..665043b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -33,7 +33,10 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
@@ -51,6 +54,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.Constants;
@@ -95,6 +100,7 @@ public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
+ private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
protected ZooKeeper globalZk() {
return pulsar().getGlobalZkCache().getZooKeeper();
@@ -541,19 +547,48 @@ public abstract class AdminResource extends PulsarWebResource {
return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
}
- protected Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
+ protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) {
+ return internalGetTopicPoliciesAsyncWithRetry(topicName,
+ new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null);
+ }
+
+ protected CompletableFuture<Optional<TopicPolicies>> internalGetTopicPoliciesAsyncWithRetry(TopicName topicName,
+ final AtomicLong remainingTime,
+ final Backoff backoff,
+ CompletableFuture<Optional<TopicPolicies>> future) {
+ CompletableFuture<Optional<TopicPolicies>> response = future == null ? new CompletableFuture<>() : future;
try {
checkTopicLevelPolicyEnable();
- return Optional.ofNullable(pulsar().getTopicPoliciesService().getTopicPolicies(topicName));
+ response.complete(Optional.ofNullable(pulsar()
+ .getTopicPoliciesService().getTopicPolicies(topicName)));
} catch (RestException re) {
- throw re;
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e){
- log.error("Topic {} policies cache have not init.", topicName);
- throw new RestException(e);
+ response.completeExceptionally(re);
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ Backoff usedBackoff = backoff == null ? new BackoffBuilder()
+ .setInitialTime(500, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
+ .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
+ .create() : backoff;
+ long nextDelay = Math.min(usedBackoff.next(), remainingTime.get());
+ if (nextDelay <= 0) {
+ response.completeExceptionally(new TimeoutException(
+ String.format("Failed to get topic policy withing configured timeout %s ms",
+ DEFAULT_GET_TOPIC_POLICY_TIMEOUT)));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.error("Topic {} policies have not been initialized yet, retry after {}ms",
+ topicName, nextDelay);
+ }
+ pulsar().getExecutor().schedule(() -> {
+ remainingTime.addAndGet(-nextDelay);
+ internalGetTopicPoliciesAsyncWithRetry(topicName, remainingTime, usedBackoff, response);
+ }, nextDelay, TimeUnit.MILLISECONDS);
+ }
} catch (Exception e) {
log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e);
- throw new RestException(e);
+ response.completeExceptionally(e);
}
+ return response;
}
protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d02c9b3..47b42cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -74,7 +74,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -588,16 +587,14 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
- TopicPolicies topicPolicies;
- try {
- topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
- topicPolicies.setDelayedDeliveryTickTimeMillis(
- deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
+ topicPolicies.setDelayedDeliveryTickTimeMillis(
+ deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -809,20 +806,12 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies cache have not init.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setOffloadPolicies(offloadPolicies);
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
- .thenCompose((res) -> {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setOffloadPolicies(offloadPolicies);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }).thenCompose(__ -> {
//The policy update is asynchronous. Cache at this step may not be updated yet.
//So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
@@ -835,30 +824,16 @@ public class PersistentTopicsBase extends AdminResource {
} else {
return internalUpdateOffloadPolicies(offloadPolicies, topicName);
}
- })
- .whenComplete((result, e) -> {
- if (e != null) {
- completableFuture.completeExceptionally(e);
- } else {
- completableFuture.complete(null);
- }
- });
- return completableFuture;
+ });
}
protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies cache have not init.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
@@ -892,19 +867,12 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}
-
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies cache have not init.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
@@ -912,29 +880,24 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}
-
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies cache have not init.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
if (interval != null && interval < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more");
}
- TopicPolicies policies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- policies.setDeduplicationSnapshotIntervalSeconds(interval);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
-
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies policies = op.orElseGet(TopicPolicies::new);
+ policies.setDeduplicationSnapshotIntervalSeconds(interval);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
+ });
}
private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
@@ -2431,56 +2394,48 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
BacklogQuota backlogQuota) {
validatePoliciesReadOnlyAccess();
- TopicPolicies topicPolicies;
- if (backlogQuotaType == null) {
- backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
- }
- try {
- topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
+ BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null
+ ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType;
- RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
- if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
- log.warn(
- "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
- clientAppId(), topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
- "Backlog Quota exceeds configured retention quota for topic. "
- + "Please increase retention quota and retry"));
- }
+ return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
- if (backlogQuota != null) {
- topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
- } else {
- topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
- }
- Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
- try {
- log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
- clientAppId(),
- namespaceName,
- topicName.getLocalName(),
- jsonMapper().writeValueAsString(backLogQuotaMap));
- } catch (JsonProcessingException ignore) { }
+
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
+ if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
+ log.warn(
+ "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+ "Backlog Quota exceeds configured retention quota for topic. "
+ + "Please increase retention quota and retry"));
+ }
+
+ if (backlogQuota != null) {
+ topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
+ } else {
+ topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
+ }
+ Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
+ try {
+ log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(backLogQuotaMap));
+ } catch (JsonProcessingException ignore) { }
+ });
});
}
protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enabled) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies cache have not init.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setDeduplicationEnabled(enabled);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setDeduplicationEnabled(enabled);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
@@ -2489,20 +2444,15 @@ public class PersistentTopicsBase extends AdminResource {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL"));
}
- TopicPolicies topicPolicies;
- try {
- topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
- topicPolicies.setMessageTTLInSeconds(ttlInSecond);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
- log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
- clientAppId(),
- namespaceName,
- topicName.getLocalName(),
- ttlInSecond);
- });
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMessageTTLInSeconds(ttlInSecond);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
+ .thenRun(() ->
+ log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
+ clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecond));
+ });
}
@@ -2520,63 +2470,73 @@ public class PersistentTopicsBase extends AdminResource {
return retentionPolicies;
}
- protected RetentionPolicies internalGetRetention(){
- return getTopicPolicies(topicName)
- .map(TopicPolicies::getRetentionPolicies).orElse(new RetentionPolicies());
+ protected CompletableFuture<RetentionPolicies> internalGetRetention(){
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getRetentionPolicies).orElseGet(RetentionPolicies::new));
}
protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- BacklogQuota backlogQuota =
- topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
- if (backlogQuota == null){
- Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
- backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
- }
- if(!checkBacklogQuota(backlogQuota, retention)){
- log.warn(
- "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
- clientAppId(), topicName);
- throw new RestException(Status.PRECONDITION_FAILED,
- "Retention Quota must exceed configured backlog quota for topic. " +
- "Please increase retention quota and retry");
- }
- topicPolicies.setRetentionPolicies(retention);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ BacklogQuota backlogQuota =
+ topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+ if (backlogQuota == null){
+ Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+ backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+ }
+ if(!checkBacklogQuota(backlogQuota, retention)){
+ log.warn(
+ "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+ clientAppId(), topicName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Retention Quota must exceed configured backlog quota for topic. " +
+ "Please increase retention quota and retry");
+ }
+ topicPolicies.setRetentionPolicies(retention);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveRetention() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setRetentionPolicies(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setRetentionPolicies(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<PersistencePolicies> internalGetPersistence(){
- return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
+ protected CompletableFuture<Optional<PersistencePolicies>> internalGetPersistence(){
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getPersistence));
}
protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
validatePersistencePolicies(persistencePolicies);
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setPersistence(persistencePolicies);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setPersistence(persistencePolicies);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemovePersistence() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setPersistence(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setPersistence(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
@@ -2585,18 +2545,22 @@ public class PersistentTopicsBase extends AdminResource {
, "topic-level maxMessageSize must be greater than or equal to 0 " +
"and must be smaller than that in the broker-level");
}
-
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setMaxMessageSize(maxMessageSize);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxMessageSize(maxMessageSize);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
- protected Optional<Integer> internalGetMaxMessageSize() {
- return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
+ protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getMaxMessageSize));
}
- protected Optional<Integer> internalGetMaxProducers() {
- return getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
+ protected CompletableFuture<Optional<Integer>> internalGetMaxProducers() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic));
}
protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) {
@@ -2604,13 +2568,17 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setMaxProducerPerTopic(maxProducers);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxProducerPerTopic(maxProducers);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
- protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
- return getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
+ protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
}
protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
@@ -2618,10 +2586,12 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
-
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> preValidation(boolean authoritative) {
@@ -2651,16 +2621,19 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveMaxProducers() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setMaxProducerPerTopic(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setMaxProducerPerTopic(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<Integer> internalGetMaxConsumers() {
- return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
+ protected CompletableFuture<Optional<Integer>> internalGetMaxConsumers() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic));
}
protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers) {
@@ -2668,19 +2641,23 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}
-
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
- topicPolicies.setMaxConsumerPerTopic(maxConsumers);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxConsumerPerTopic(maxConsumers);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveMaxConsumers() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setMaxConsumerPerTopic(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setMaxConsumerPerTopic(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
protected MessageId internalTerminate(boolean authoritative) {
@@ -3478,104 +3455,122 @@ public class PersistentTopicsBase extends AdminResource {
}
- protected Optional<DispatchRate> internalGetDispatchRate() {
- return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
+ protected CompletableFuture<Optional<DispatchRate>> internalGetDispatchRate() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getDispatchRate));
}
protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) {
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setDispatchRate(dispatchRate);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setDispatchRate(dispatchRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveDispatchRate() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setDispatchRate(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
-
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setDispatchRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
- return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
+ protected CompletableFuture<Optional<DispatchRate>> internalGetSubscriptionDispatchRate() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate));
}
protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setSubscriptionDispatchRate(dispatchRate);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setSubscriptionDispatchRate(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setSubscriptionDispatchRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
- return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
+ protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
}
protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
}
-
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setMaxConsumersPerSubscription(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setMaxConsumersPerSubscription(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<Long> internalGetCompactionThreshold() {
- return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
+ protected CompletableFuture<Optional<Long>> internalGetCompactionThreshold() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getCompactionThreshold));
}
protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}
-
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setCompactionThreshold(compactionThreshold);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setCompactionThreshold(compactionThreshold);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setCompactionThreshold(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setCompactionThreshold(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<PublishRate> internalGetPublishRate() {
- return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
+ protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getPublishRate));
}
@@ -3583,42 +3578,51 @@ public class PersistentTopicsBase extends AdminResource {
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setPublishRate(publishRate);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setPublishRate(publishRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemovePublishRate() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setPublishRate(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setPublishRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
- protected Optional<SubscribeRate> internalGetSubscribeRate() {
- return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
+ protected CompletableFuture<Optional<SubscribeRate>> internalGetSubscribeRate() {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getSubscribeRate));
}
protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
- TopicPolicies topicPolicies = getTopicPolicies(topicName)
- .orElseGet(TopicPolicies::new);
- topicPolicies.setSubscribeRate(subscribeRate);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setSubscribeRate(subscribeRate);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalRemoveSubscribeRate() {
- Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
- if (!topicPolicies.isPresent()) {
- return CompletableFuture.completedFuture(null);
- }
- topicPolicies.get().setSubscribeRate(null);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setSubscribeRate(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
+ });
}
protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9267c28..a108adb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -32,7 +32,6 @@ import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -269,14 +268,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isOffloadPoliciesSet()) {
- asyncResponse.resume(topicPolicies.getOffloadPolicies());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isOffloadPoliciesSet()) {
+ asyncResponse.resume(topicPolicies.getOffloadPolicies());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getOffloadPolicies", ex, asyncResponse);
return null;
@@ -342,14 +342,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
- asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
+ asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
return null;
@@ -414,14 +415,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
- asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
+ asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getDeduplicationSnapshotInterval", ex, asyncResponse);
return null;
@@ -486,14 +488,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isInactiveTopicPoliciesSet()) {
- asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isInactiveTopicPoliciesSet()) {
+ asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
return null;
@@ -558,14 +561,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
- asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
+ asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
return null;
@@ -633,15 +637,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
- asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
- , topicPolicies.getDelayedDeliveryEnabled()));
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+ asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+ , topicPolicies.getDelayedDeliveryEnabled()));
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse);
return null;
@@ -1539,14 +1544,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> asyncResponse.resume(getTopicPolicies(topicName)
- .map(TopicPolicies::getBackLogQuotaMap)
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
map.forEach((key,value) -> hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key),value));
return hashMap;
- })
- .orElse(Maps.newHashMap())))
+ }).orElse(Maps.newHashMap()))
+ .thenAccept(r -> asyncResponse.resume(r))
.exceptionally(ex -> {
handleTopicPolicyException("getBacklogQuotaMap", ex, asyncResponse);
return null;
@@ -1616,9 +1621,9 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> asyncResponse.resume(getTopicPolicies(topicName)
- .map(TopicPolicies::getMessageTTLInSeconds)
- .orElse(0)))
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.map(TopicPolicies::getMessageTTLInSeconds).orElse(0))
+ .thenAccept(r -> asyncResponse.resume(r))
.exceptionally(ex -> {
handleTopicPolicyException("getMessageTTL", ex, asyncResponse);
return null;
@@ -1687,14 +1692,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isDeduplicationSet()) {
- asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isDeduplicationSet()) {
+ asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getDeduplicationEnabled", ex, asyncResponse);
return null;
@@ -1762,17 +1768,18 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isRetentionSet()) {
- asyncResponse.resume(topicPolicies.getRetentionPolicies());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
- .exceptionally(ex -> {
- handleTopicPolicyException("getRetention", ex, asyncResponse);
- return null;
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenApply(op -> op.orElseGet(TopicPolicies::new))
+ .thenAccept(topicPolicies -> {
+ if (topicPolicies.isRetentionSet()) {
+ asyncResponse.resume(topicPolicies.getRetentionPolicies());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
+ .exceptionally(ex -> {
+ handleTopicPolicyException("getRetention", ex, asyncResponse);
+ return null;
});
}
@@ -1856,17 +1863,17 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<PersistencePolicies> persistencePolicies = internalGetPersistence();
- if (!persistencePolicies.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(persistencePolicies.get());
- }
- })
- .exceptionally(ex -> {
- handleTopicPolicyException("getPersistence", ex, asyncResponse);
- return null;
+ .thenCompose(__ -> internalGetPersistence())
+ .thenAccept(persistencePolicies -> {
+ if (!persistencePolicies.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(persistencePolicies.get());
+ }
+ })
+ .exceptionally(ex -> {
+ handleTopicPolicyException("getPersistence", ex, asyncResponse);
+ return null;
});
}
@@ -1951,14 +1958,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
- if (!maxSubscriptionsPerTopic.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(maxSubscriptionsPerTopic.get());
- }
- })
+ .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
+ .thenAccept(maxSubscriptionsPerTopic -> {
+ if (!maxSubscriptionsPerTopic.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxSubscriptionsPerTopic.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxSubscriptionsPerTopic", ex, asyncResponse);
return null;
@@ -2039,14 +2046,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Integer> maxProducers = internalGetMaxProducers();
- if (!maxProducers.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(maxProducers.get());
- }
- })
+ .thenCompose(__ -> internalGetMaxProducers())
+ .thenAccept(maxProducers -> {
+ if (!maxProducers.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxProducers.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxProducers", ex, asyncResponse);
return null;
@@ -2129,14 +2136,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Integer> maxConsumers = internalGetMaxConsumers();
- if (!maxConsumers.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(maxConsumers.get());
- }
- })
+ .thenCompose(__ -> internalGetMaxConsumers())
+ .thenAccept(maxConsumers -> {
+ if (!maxConsumers.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxConsumers.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxConsumers", ex, asyncResponse);
return null;
@@ -2219,14 +2226,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Integer> policies = internalGetMaxMessageSize();
- if (policies.isPresent()) {
- asyncResponse.resume(policies.get());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- })
+ .thenCompose(__ -> internalGetMaxMessageSize())
+ .thenAccept(policies -> {
+ if (policies.isPresent()) {
+ asyncResponse.resume(policies.get());
+ } else {
+ asyncResponse.resume(Response.noContent().build());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxMessageSize", ex, asyncResponse);
return null;
@@ -2512,14 +2519,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
- if (!dispatchRate.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(dispatchRate.get());
- }
- })
+ .thenCompose(__ -> internalGetDispatchRate())
+ .thenAccept(dispatchRate -> {
+ if (!dispatchRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(dispatchRate.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getDispatchRate", ex, asyncResponse);
return null;
@@ -2606,14 +2613,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
- if (!dispatchRate.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(dispatchRate.get());
- }
- })
+ .thenCompose(__ -> internalGetSubscriptionDispatchRate())
+ .thenAccept(dispatchRate -> {
+ if (!dispatchRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(dispatchRate.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse);
return null;
@@ -2700,14 +2707,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Long> compactionThreshold = internalGetCompactionThreshold();
- if (!compactionThreshold.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(compactionThreshold.get());
- }
- })
+ .thenCompose(__ -> internalGetCompactionThreshold())
+ .thenAccept(compactionThreshold -> {
+ if (!compactionThreshold.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(compactionThreshold.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse);
return null;
@@ -2794,14 +2801,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
- if (!maxConsumersPerSubscription.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(maxConsumersPerSubscription.get());
- }
- })
+ .thenCompose(__ -> internalGetMaxConsumersPerSubscription())
+ .thenAccept(maxConsumersPerSubscription -> {
+ if (!maxConsumersPerSubscription.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(maxConsumersPerSubscription.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getMaxConsumersPerSubscription", ex, asyncResponse);
return null;
@@ -2889,14 +2896,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<PublishRate> publishRate = internalGetPublishRate();
- if (!publishRate.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(publishRate.get());
- }
- })
+ .thenCompose(__ -> internalGetPublishRate())
+ .thenAccept(publishRate -> {
+ if (!publishRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(publishRate.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getPublishRate", ex, asyncResponse);
return null;
@@ -2983,14 +2990,14 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenRun(() -> {
- Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
- if (!subscribeRate.isPresent()) {
- asyncResponse.resume(Response.noContent().build());
- } else {
- asyncResponse.resume(subscribeRate.get());
- }
- })
+ .thenCompose(__ -> internalGetSubscribeRate())
+ .thenAccept(subscribeRate -> {
+ if (!subscribeRate.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(subscribeRate.get());
+ }
+ })
.exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
return null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
index a1c7614..e6b21cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java
@@ -34,8 +34,7 @@ public class BackoffBuilder {
private long mandatoryStop;
private TimeUnit unitMandatoryStop;
- @VisibleForTesting
- BackoffBuilder() {
+ public BackoffBuilder() {
this.initial = 0;
this.max = 0;
this.mandatoryStop = 0;