You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/12/13 09:30:50 UTC

[pulsar] branch branch-2.9 updated (fd62860 -> 2055eca)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from fd62860  Fix consume message order issue when use listener. (#13023)
     new 66b58dc  Fix the reader skips compacted data which original ledger been removed (#12522)
     new 2055eca  Close old compacted ledger when open new. (#13210)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../service/persistent/CompactorSubscription.java  | 36 ++++++++++++++--------
 .../apache/pulsar/compaction/CompactedTopic.java   |  3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      | 21 ++++++++-----
 .../pulsar/broker/service/PersistentTopicTest.java |  9 ++++--
 .../pulsar/compaction/CompactedTopicTest.java      | 14 ++++++++-
 5 files changed, 58 insertions(+), 25 deletions(-)

[pulsar] 01/02: Fix the reader skips compacted data which original ledger been removed (#12522)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 66b58dcb8853113a2f7503970299020c302d1fc0
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Nov 2 12:30:30 2021 +0800

    Fix the reader skips compacted data which original ledger been removed (#12522)
    
    * Fix the reader skips compacted data which original ledger been removed
    
    The compactor update the compaction cursor(mark delete) first and then update the `compactionHorizon` of the compacted topic.
    During the compaction cursor move forward, the original ledger will be removed if no other durable cursors.
    At the same time, if the reader is reading data from the original ledger, the reader will skip the data while the original ledger
    been removed, details to see https://github.com/apache/pulsar/pull/6787. So the reader might skip the compacted data since the
    `compactionHorizon` have not updated yet.
    
    The approach is:
    
    1. Update the `compactionHorizon` before the compaction cursor move forward,
       so that the reader will not skip the original data before `compactionHorizon` updated.
       If the broker crashes before the new compacted Ledger ID been persistent,
       after the topic been loaded, the compaction can be trigger again and will not loss any data,
       but we will have an orphan ledger cannot be delete in the BookKeeper cluster.
    2. Remove the previous compacted Ledger after the compaction cursor move forward, make sure the new compacted Ledger ID been persistent,
       Otherwise, we might lost compacted ledger if broker crashes.
    
    * Fix checkstyle
    
    * Fix tests.
    
    * Fix test
    
    (cherry picked from commit 74dd9b973f019d6975497f4cbe1bd9925e6137d1)
---
 .../service/persistent/CompactorSubscription.java  | 28 +++++++++++++---------
 .../apache/pulsar/compaction/CompactedTopic.java   |  3 ++-
 .../pulsar/compaction/CompactedTopicImpl.java      | 21 ++++++++++------
 .../pulsar/broker/service/PersistentTopicTest.java |  7 +++---
 .../pulsar/compaction/CompactedTopicTest.java      | 14 ++++++++++-
 5 files changed, 50 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index f76dd75..76e54b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service.persistent;
 import static com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -34,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CompactorSubscription extends PersistentSubscription {
-    private CompactedTopic compactedTopic;
+    private final CompactedTopic compactedTopic;
 
     public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
                                  String subscriptionName, ManagedCursor cursor) {
@@ -65,15 +64,25 @@ public class CompactorSubscription extends PersistentSubscription {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position);
         }
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
+
+        // The newCompactedLedger must be called at the first step because we need to ensure the reader can read
+        // complete data from compacted Ledger, otherwise, if the original ledger been deleted the reader cursor
+        // might move to a subsequent original ledger if `compactionHorizon` have not updated, this will lead to
+        // the reader skips compacted data at that time, after the `compactionHorizon` updated, the reader able
+        // to read the complete compacted data again.
+        // And we can only delete the previous ledger after the mark delete succeed, otherwise we will loss the
+        // compacted data if mark delete failed.
+        compactedTopic.newCompactedLedger(position, compactedLedgerId).thenAccept(previousContext -> {
+            cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
                 @Override
                 public void markDeleteComplete(Object ctx) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}",
-                                  topicName, subName, position);
+                                topicName, subName, position);
+                    }
+                    if (previousContext != null) {
+                        compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
                     }
-                    future.complete(null);
                 }
 
                 @Override
@@ -81,19 +90,16 @@ public class CompactorSubscription extends PersistentSubscription {
                     // TODO: cut consumer connection on markDeleteFailed
                     if (log.isDebugEnabled()) {
                         log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}",
-                                  topicName, subName, ctx, exception);
+                                topicName, subName, ctx, exception);
                     }
                 }
             }, null);
+        });
 
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
             dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
         }
-
-        // Once properties have been persisted, we can notify the compacted topic to use
-        // the new ledger
-        future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, compactedLedgerId));
     }
 
     private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 7c96937..31955a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -26,7 +26,8 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.Consumer;
 
 public interface CompactedTopic {
-    CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int numberOfEntriesToRead,
                                 boolean isFirstRead,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 1313413..bca5682 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.compaction;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -64,7 +65,7 @@ public class CompactedTopicImpl implements CompactedTopic {
     }
 
     @Override
-    public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
+    public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
         synchronized (this) {
             compactionHorizon = (PositionImpl) p;
 
@@ -72,16 +73,17 @@ public class CompactedTopicImpl implements CompactedTopic {
             compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
 
             // delete the ledger from the old context once the new one is open
-            if (previousContext != null) {
-                return compactedTopicContext.thenCompose((res) -> previousContext)
-                    .thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
-            } else {
-                return compactedTopicContext;
-            }
+            return compactedTopicContext.thenCompose(__ ->
+                    previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
+        return tryDeleteCompactedLedger(bk, compactedLedgerId);
+    }
+
+    @Override
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int numberOfEntriesToRead,
                                        boolean isFirstRead,
@@ -298,6 +300,11 @@ public class CompactedTopicImpl implements CompactedTopic {
             .compare(p.getLedgerId(), m.getLedgerId())
             .compare(p.getEntryId(), m.getEntryId()).result();
     }
+
+    @VisibleForTesting
+    PositionImpl getCompactionHorizon() {
+        return this.compactionHorizon;
+    }
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index cb10d72..6e68a39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -124,12 +125,10 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -1850,6 +1849,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     public void testCompactorSubscription() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
         PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic,
                                                                Compactor.COMPACTION_SUBSCRIPTION,
                                                                cursorMock);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 3d41088..0083fc7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -241,7 +241,19 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                       Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
 
         // update the compacted topic ledger
-        compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
+        PositionImpl newHorizon = new PositionImpl(1,3);
+        compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId()).get();
+
+        // Make sure the old compacted ledger still exist after the new compacted ledger created.
+        bk.openLedger(oldCompactedLedger.getId(),
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+
+        Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
+        Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
+                newCompactedLedger.getId());
+        Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon);
+        compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
 
         // old ledger should be deleted, new still there
         try {

[pulsar] 02/02: Close old compacted ledger when open new. (#13210)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2055ecade9a1e250c9cfdfc625a5fa973d59ed59
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Dec 13 17:03:20 2021 +0800

    Close old compacted ledger when open new. (#13210)
    
    (cherry picked from commit 07ef9231db8b844586b9217ee2d59237eb9c54b7)
---
 .../pulsar/broker/service/persistent/CompactorSubscription.java   | 8 ++++++--
 .../org/apache/pulsar/broker/service/PersistentTopicTest.java     | 2 ++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 76e54b4..f7279968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -47,8 +47,12 @@ public class CompactorSubscription extends PersistentSubscription {
         Map<String, Long> properties = cursor.getProperties();
         if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) {
             long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
-            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(),
-                                              compactedLedgerId);
+            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(), compactedLedgerId)
+                    .thenAccept(previousContext -> {
+                        if (previousContext != null) {
+                            compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
+                        }
+                    });
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 6e68a39..f1a11ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1873,6 +1873,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
         new CompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
         verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId);
     }