You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/06 01:45:39 UTC

[pulsar] 02/04: Fix some topic policy operation without backoff (#11560)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3ce4c59a1b1f871af696a4abea23cbf1e02658e7
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Aug 5 18:54:36 2021 +0800

    Fix some topic policy operation without backoff (#11560)
    
    * Fix some topic policy operation without backoff
    
    Related to https://github.com/apache/pulsar/pull/11487
    
    (cherry picked from commit 92a3ac7f2d9ba94da43eea28ad3d88ceffb6015d)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 127 +++++++--------------
 1 file changed, 42 insertions(+), 85 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3d78116..0c1a410 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -67,7 +67,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -780,41 +779,25 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setOffloadPolicies(offloadPolicies);
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
-                .thenCompose((res) -> {
-                    //The policy update is asynchronous. Cache at this step may not be updated yet.
-                    //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
-                    PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
-                    if (metadata.partitions > 0) {
-                        List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
-                        for (int i = 0; i < metadata.partitions; i++) {
-                            futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
-                        }
-                        return FutureUtil.waitForAll(futures);
-                    } else {
-                        return internalUpdateOffloadPolicies(offloadPolicies, topicName);
-                    }
-                })
-                .whenComplete((result, e) -> {
-                    if (e != null) {
-                        completableFuture.completeExceptionally(e);
-                    } else {
-                        completableFuture.complete(null);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setOffloadPolicies(offloadPolicies);
+                return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+            }).thenCompose(__ -> {
+                //The policy update is asynchronous. Cache at this step may not be updated yet.
+                //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
+                PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
+                if (metadata.partitions > 0) {
+                    List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
+                    for (int i = 0; i < metadata.partitions; i++) {
+                        futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
                     }
-                });
-        return completableFuture;
+                    return FutureUtil.waitForAll(futures);
+                } else {
+                    return internalUpdateOffloadPolicies(offloadPolicies, topicName);
+                }
+            });
     }
 
     protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
@@ -833,19 +816,12 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
-                    "Policies have not been initialized yet"));
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
+                return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+            });
     }
 
     private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
@@ -896,18 +872,12 @@ public class PersistentTopicsBase extends AdminResource {
                     "maxUnackedNum must be 0 or more");
         }
 
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+                return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+            });
     }
 
     protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
@@ -928,19 +898,12 @@ public class PersistentTopicsBase extends AdminResource {
                     "maxUnackedNum must be 0 or more");
         }
 
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
-                    "Policies have not been initialized yet"));
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
+                return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
@@ -2724,18 +2687,12 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
-        TopicPolicies topicPolicies = null;
-        try {
-            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.error("Topic {} policies have not been initialized yet.", topicName);
-            throw new RestException(Status.PRECONDITION_FAILED, "Policies have not been initialized yet");
-        }
-        if (topicPolicies == null) {
-            topicPolicies = new TopicPolicies();
-        }
-        topicPolicies.setDeduplicationEnabled(enabled);
-        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+        return getTopicPoliciesAsyncWithRetry(topicName)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setDeduplicationEnabled(enabled);
+                return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+            });
     }
 
     protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {