You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/24 09:46:03 UTC

[pulsar] branch master updated: Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4723a75c29 Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
e4723a75c29 is described below

commit e4723a75c295e7fcddc4e2334c6e486abd940b57
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Aug 24 17:45:54 2022 +0800

    Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 44 ----------------------
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |  6 ---
 2 files changed, 50 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 231168e35e9..26b208e2193 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -53,9 +53,7 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
@@ -855,19 +853,6 @@ public class PersistentTopicsBase extends AdminResource {
                 topicPolicies.setOffloadPolicies(offloadPolicies);
                 topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
-            }).thenCompose(__ -> {
-                //The policy update is asynchronous. Cache at this step may not be updated yet.
-                //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
-                PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
-                if (metadata.partitions > 0) {
-                    List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
-                    for (int i = 0; i < metadata.partitions; i++) {
-                        futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
-                    }
-                    return FutureUtil.waitForAll(futures);
-                } else {
-                    return internalUpdateOffloadPolicies(offloadPolicies, topicName);
-                }
             });
     }
 
@@ -898,35 +883,6 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
-    private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
-                                                                  TopicName topicName) {
-        return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
-                .thenAccept(optionalTopic -> {
-                    try {
-                        if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
-                            return;
-                        }
-                        PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
-                        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
-                        if (offloadPolicies == null) {
-                            LedgerOffloader namespaceOffloader =
-                                    pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
-                            LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
-                            if (topicOffloader != null && topicOffloader != namespaceOffloader) {
-                                topicOffloader.close();
-                            }
-                            managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
-                        } else {
-                            managedLedgerConfig.setLedgerOffloader(
-                                    pulsar().createManagedLedgerOffloader(offloadPolicies));
-                        }
-                        persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
-                    } catch (PulsarServerException e) {
-                        throw new RestException(e);
-                    }
-                });
-    }
-
     protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
                                                                                      boolean isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 814cef3579d..38074112828 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -328,12 +328,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         admin.topics().removeOffloadPolicies(topicName);
         Awaitility.await().untilAsserted(()
                 -> assertNull(admin.topics().getOffloadPolicies(topicName)));
-        // topic level offloader should be closed
-        if (isPartitioned) {
-            verify(topicOffloader, times(partitionNum)).close();
-        } else {
-            verify(topicOffloader).close();
-        }
         if (isPartitioned) {
             for (int i = 0; i < partitionNum; i++) {
                 PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()