You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/03/15 10:58:58 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f306256 [Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)
f306256 is described below
commit f306256a536f688dd693bb3493be6051c5ff40d2
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Mar 15 18:56:20 2022 +0800
[Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)
Co-authored-by: Qiang Zhao <74...@users.noreply.github.com>
---
.../SystemTopicBasedTopicPoliciesService.java | 37 +++++++++++-----------
.../integration/compaction/TestCompaction.java | 2 +-
2 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0541c7a..7ba7aa2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -164,8 +165,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
if (ex != null) {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
result.completeExceptionally(ex);
- readerCaches.remove(namespace);
- reader.closeAsync();
+ cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
@@ -218,13 +218,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
}
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
- CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = readerCaches.remove(namespace);
- if (readerCompletableFuture != null) {
- readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
- ownedBundlesCountPerNamespace.remove(namespace);
- policyCacheInitMap.remove(namespace);
- policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace));
- }
+ cleanCacheAndCloseReader(namespace, true);
}
return CompletableFuture.completedFuture(null);
}
@@ -258,9 +252,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
log.error("[{}] Failed to check the move events for the system topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
- readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- reader.closeAsync();
+ cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return;
}
if (hasMore) {
@@ -269,9 +261,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
log.error("[{}] Failed to read event from the system topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
- readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- reader.closeAsync();
+ cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return;
}
refreshTopicPoliciesCache(msg);
@@ -301,6 +291,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
});
}
+ private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+ CompletableFuture<SystemTopicClient.Reader> readerFuture = readerCaches.remove(namespace);
+ policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ if (cleanOwnedBundlesCount) {
+ ownedBundlesCountPerNamespace.remove(namespace);
+ }
+ if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+ readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ }
+ policyCacheInitMap.remove(namespace);
+ }
+
private void readMorePolicies(SystemTopicClient.Reader reader) {
reader.readNextAsync().whenComplete((msg, ex) -> {
if (ex == null) {
@@ -310,10 +312,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
} else {
if (ex instanceof PulsarClientException.AlreadyClosedException) {
log.error("Read more topic policies exception, close the read now!", ex);
- NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject();
- ownedBundlesCountPerNamespace.remove(namespace);
- readerCaches.remove(namespace);
+ cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
+ log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 52846da..5bd02ef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -85,7 +85,7 @@ public class TestCompaction extends PulsarTestSuite {
assertEquals(m.getValue(), "content1");
}
- pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", "-t", topic);
+ pulsarCluster.runAdminCommandOnAnyBroker("topics", "compact", topic);
try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)