You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/03/15 10:58:58 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f306256  [Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)
f306256 is described below

commit f306256a536f688dd693bb3493be6051c5ff40d2
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Mar 15 18:56:20 2022 +0800

    [Branch-2.7] Fixed wrong behaviour caused by not cleaning up topic policy service state (#14644)
    
    Co-authored-by: Qiang Zhao <74...@users.noreply.github.com>
---
 .../SystemTopicBasedTopicPoliciesService.java      | 37 +++++++++++-----------
 .../integration/compaction/TestCompaction.java     |  2 +-
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0541c7a..7ba7aa2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -164,8 +165,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (ex != null) {
                     log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
                     result.completeExceptionally(ex);
-                    readerCaches.remove(namespace);
-                    reader.closeAsync();
+                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 } else {
                     initPolicesCache(reader, result);
                     result.thenRun(() -> readMorePolicies(reader));
@@ -218,13 +218,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
         AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
-            CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = readerCaches.remove(namespace);
-            if (readerCompletableFuture != null) {
-                readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
-                ownedBundlesCountPerNamespace.remove(namespace);
-                policyCacheInitMap.remove(namespace);
-                policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace));
-            }
+            cleanCacheAndCloseReader(namespace, true);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -258,9 +252,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                reader.closeAsync();
+                cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 return;
             }
             if (hasMore) {
@@ -269,9 +261,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         log.error("[{}] Failed to read event from the system topic.",
                                 reader.getSystemTopic().getTopicName(), e);
                         future.completeExceptionally(e);
-                        readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        reader.closeAsync();
+                        cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         return;
                     }
                     refreshTopicPoliciesCache(msg);
@@ -301,6 +291,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         });
     }
 
+    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+        CompletableFuture<SystemTopicClient.Reader> readerFuture = readerCaches.remove(namespace);
+        policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+        if (cleanOwnedBundlesCount) {
+            ownedBundlesCountPerNamespace.remove(namespace);
+        }
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+        }
+        policyCacheInitMap.remove(namespace);
+    }
+
     private void readMorePolicies(SystemTopicClient.Reader reader) {
         reader.readNextAsync().whenComplete((msg, ex) -> {
             if (ex == null) {
@@ -310,10 +312,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             } else {
                 if (ex instanceof PulsarClientException.AlreadyClosedException) {
                     log.error("Read more topic policies exception, close the read now!", ex);
-                    NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject();
-                    ownedBundlesCountPerNamespace.remove(namespace);
-                    readerCaches.remove(namespace);
+                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 } else {
+                    log.warn("Read more topic polices exception, read again.", ex);
                     readMorePolicies(reader);
                 }
             }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index 52846da..5bd02ef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -85,7 +85,7 @@ public class TestCompaction extends PulsarTestSuite {
                 assertEquals(m.getValue(), "content1");
             }
 
-            pulsarCluster.runPulsarBaseCommandOnAnyBroker("compact-topic", "-t", topic);
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "compact", topic);
 
             try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
                 .topic(topic)