You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/06 03:34:28 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f78c63b6d06 [Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)
f78c63b6d06 is described below

commit f78c63b6d06c1efb759f3f2171186cddca6bc6eb
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jul 6 11:34:20 2022 +0800

    [Branch-2.7][Cherry-pick] Fix the reader skips compacted data which original ledger been removed. (#16407)
---
 .../service/persistent/CompactorSubscription.java    | 20 +++++++++-----------
 .../org/apache/pulsar/compaction/CompactedTopic.java |  3 ++-
 .../apache/pulsar/compaction/CompactedTopicImpl.java | 16 +++++++++-------
 .../pulsar/broker/service/PersistentTopicTest.java   |  4 ++++
 .../apache/pulsar/compaction/CompactedTopicTest.java | 14 +++++++++++++-
 5 files changed, 37 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 94af8063f5a..ea249ca9471 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -36,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CompactorSubscription extends PersistentSubscription {
-    private CompactedTopic compactedTopic;
+    private final CompactedTopic compactedTopic;
 
     public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
                                  String subscriptionName, ManagedCursor cursor) {
@@ -67,15 +66,17 @@ public class CompactorSubscription extends PersistentSubscription {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position);
         }
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
+        compactedTopic.newCompactedLedger(position, compactedLedgerId).thenAccept(previousContext -> {
+            cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
                 @Override
                 public void markDeleteComplete(Object ctx) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}",
-                                  topicName, subName, position);
+                                topicName, subName, position);
+                    }
+                    if (previousContext != null) {
+                        compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
                     }
-                    future.complete(null);
                 }
 
                 @Override
@@ -83,19 +84,16 @@ public class CompactorSubscription extends PersistentSubscription {
                     // TODO: cut consumer connection on markDeleteFailed
                     if (log.isDebugEnabled()) {
                         log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}",
-                                  topicName, subName, ctx, exception);
+                                topicName, subName, ctx, exception);
                     }
                 }
             }, null);
+        });
 
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
             dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
         }
-
-        // Once properties have been persisted, we can notify the compacted topic to use
-        // the new ledger
-        future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, compactedLedgerId));
     }
 
     private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index c91931d071a..17e55eeae76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -27,7 +27,8 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.Consumer;
 
 public interface CompactedTopic {
-    CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<CompactedTopicImpl.CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int numberOfEntriesToRead,
                                 boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index c3af219f21b..83c9fd1d684 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -65,7 +65,7 @@ public class CompactedTopicImpl implements CompactedTopic {
     }
 
     @Override
-    public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
+    public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
         synchronized (this) {
             compactionHorizon = (PositionImpl)p;
 
@@ -73,15 +73,16 @@ public class CompactedTopicImpl implements CompactedTopic {
             compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
 
             // delete the ledger from the old context once the new one is open
-            if (previousContext != null) {
-                return compactedTopicContext.thenCompose((res) -> previousContext)
-                    .thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
-            } else {
-                return compactedTopicContext;
-            }
+            return compactedTopicContext.thenCompose(__ ->
+                    previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
         }
     }
 
+    @Override
+    public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
+        return tryDeleteCompactedLedger(bk, compactedLedgerId);
+    }
+
     @Override
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int numberOfEntriesToRead,
@@ -323,6 +324,7 @@ public class CompactedTopicImpl implements CompactedTopic {
     public synchronized Optional<Position> getCompactionHorizon() {
         return Optional.ofNullable(this.compactionHorizon);
     }
+
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 17093ab41d9..1d265c8a368 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.matches;
@@ -117,6 +118,7 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -1509,6 +1511,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     public void testCompactorSubscription() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicImpl.CompactedTopicContext.class)));
         PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic,
                                                                Compactor.COMPACTION_SUBSCRIPTION,
                                                                cursorMock);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 58be6d8936f..2252ed9a214 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -237,7 +237,19 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                       Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
 
         // update the compacted topic ledger
-        compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
+        PositionImpl newHorizon = new PositionImpl(1,3);
+        compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId()).get();
+
+        // Make sure the old compacted ledger still exist after the new compacted ledger created.
+        bk.openLedger(oldCompactedLedger.getId(),
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+
+        Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
+        Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
+                newCompactedLedger.getId());
+        Assert.assertEquals(compactedTopic.getCompactionHorizon().get(), newHorizon);
+        compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
 
         // old ledger should be deleted, new still there
         try {