You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/24 09:46:03 UTC
[pulsar] branch master updated: Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4723a75c29 Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
e4723a75c29 is described below
commit e4723a75c295e7fcddc4e2334c6e486abd940b57
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Aug 24 17:45:54 2022 +0800
Remove `internalUpdateOffloadPolicies` to keep the same behavior update topic policy (#17236)
---
.../broker/admin/impl/PersistentTopicsBase.java | 44 ----------------------
.../pulsar/broker/admin/AdminApiOffloadTest.java | 6 ---
2 files changed, 50 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 231168e35e9..26b208e2193 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -53,9 +53,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
@@ -855,19 +853,6 @@ public class PersistentTopicsBase extends AdminResource {
topicPolicies.setOffloadPolicies(offloadPolicies);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
- }).thenCompose(__ -> {
- //The policy update is asynchronous. Cache at this step may not be updated yet.
- //So we need to set the loader by the incoming offloadPolicies instead of topic policies cache.
- PartitionedTopicMetadata metadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
- if (metadata.partitions > 0) {
- List<CompletableFuture<Void>> futures = new ArrayList<>(metadata.partitions);
- for (int i = 0; i < metadata.partitions; i++) {
- futures.add(internalUpdateOffloadPolicies(offloadPolicies, topicName.getPartition(i)));
- }
- return FutureUtil.waitForAll(futures);
- } else {
- return internalUpdateOffloadPolicies(offloadPolicies, topicName);
- }
});
}
@@ -898,35 +883,6 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
- TopicName topicName) {
- return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
- .thenAccept(optionalTopic -> {
- try {
- if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
- return;
- }
- PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
- ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
- if (offloadPolicies == null) {
- LedgerOffloader namespaceOffloader =
- pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
- LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
- if (topicOffloader != null && topicOffloader != namespaceOffloader) {
- topicOffloader.close();
- }
- managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
- } else {
- managedLedgerConfig.setLedgerOffloader(
- pulsar().createManagedLedgerOffloader(offloadPolicies));
- }
- persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
- } catch (PulsarServerException e) {
- throw new RestException(e);
- }
- });
- }
-
protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 814cef3579d..38074112828 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -328,12 +328,6 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
admin.topics().removeOffloadPolicies(topicName);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
- // topic level offloader should be closed
- if (isPartitioned) {
- verify(topicOffloader, times(partitionNum)).close();
- } else {
- verify(topicOffloader).close();
- }
if (isPartitioned) {
for (int i = 0; i < partitionNum; i++) {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()