You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 01:45:39 UTC
[pulsar] 02/04: Fix some topic policy operation without backoff
(#11560)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3ce4c59a1b1f871af696a4abea23cbf1e02658e7
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Aug 5 18:54:36 2021 +0800
Fix some topic policy operation without backoff (#11560)
* Fix some topic policy operation without backoff
Related to https://github.com/apache/pulsar/pull/11487
(cherry picked from commit 92a3ac7f2d9ba94da43eea28ad3d88ceffb6015d)
---
.../broker/admin/impl/PersistentTopicsBase.java | 127 +++++++--------------
1 file changed, 42 insertions(+), 85 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3d78116..0c1a410 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -67,7 +67,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -780,41 +779,25 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies have not been initialized yet.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setOffloadPolicies(offloadPolicies);
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
- .thenCompose((res) -> {
- //The policy update is asynchronous. Cache at this step may not be updated yet.
- //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
- PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
- if (metadata.partitions > 0) {
- List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
- for (int i = 0; i < metadata.partitions; i++) {
- futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
- }
- return FutureUtil.waitForAll(futures);
- } else {
- return internalUpdateOffloadPolicies(offloadPolicies, topicName);
- }
- })
- .whenComplete((result, e) -> {
- if (e != null) {
- completableFuture.completeExceptionally(e);
- } else {
- completableFuture.complete(null);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setOffloadPolicies(offloadPolicies);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }).thenCompose(__ -> {
+ //The policy update is asynchronous. Cache at this step may not be updated yet.
+ //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
+ PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
+ if (metadata.partitions > 0) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
+ for (int i = 0; i < metadata.partitions; i++) {
+ futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
}
- });
- return completableFuture;
+ return FutureUtil.waitForAll(futures);
+ } else {
+ return internalUpdateOffloadPolicies(offloadPolicies, topicName);
+ }
+ });
}
protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
@@ -833,19 +816,12 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies have not been initialized yet.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
- "Policies have not been initialized yet"));
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
@@ -896,18 +872,12 @@ public class PersistentTopicsBase extends AdminResource {
"maxUnackedNum must be 0 or more");
}
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies have not been initialized yet.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
@@ -928,19 +898,12 @@ public class PersistentTopicsBase extends AdminResource {
"maxUnackedNum must be 0 or more");
}
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies have not been initialized yet.", topicName);
- return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
- "Policies have not been initialized yet"));
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
@@ -2724,18 +2687,12 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
- TopicPolicies topicPolicies = null;
- try {
- topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.error("Topic {} policies have not been initialized yet.", topicName);
- throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
- }
- if (topicPolicies == null) {
- topicPolicies = new TopicPolicies();
- }
- topicPolicies.setDeduplicationEnabled(enabled);
- return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setDeduplicationEnabled(enabled);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ });
}
protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {