You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/28 17:09:58 UTC

[incubator-pulsar] branch master updated: Cleanup old compacted topic ledgers when a new one is available (#1302)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a6b0fb  Cleanup old compacted topic ledgers when a new one is available (#1302)
3a6b0fb is described below

commit 3a6b0fbdcc6c0b2fe44a7d5b3899a1c925791006
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Feb 28 18:09:56 2018 +0100

    Cleanup old compacted topic ledgers when a new one is available (#1302)
    
    When a broker is notified of a new compacted ledger for a topic, and
    after it has been persisted to the compaction cursor, delete the old
    compacted topic ledger. The deletion is best effort. If it fails it
    doesn't affect reading from the newer ledger.
    
    This isn't foolproof by any means. There's still plenty of
    opportunities for ledgers to be orphaned. We'll need some sort of GC
    based on ledger metadata to handle all cases, but that needs a newer
    bookkeeper (for the metadata properties changes).
---
 .../apache/pulsar/compaction/CompactedTopic.java   |  3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      | 27 +++++++++-
 .../pulsar/compaction/CompactedTopicTest.java      | 63 +++++++++++++++-------
 3 files changed, 72 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 65e78a9..d95f0c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 
 public interface CompactedTopic {
-    void newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
     void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
                                 ReadEntriesCallback callback, Object ctx);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index f36e11f..e70381e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -61,10 +61,20 @@ public class CompactedTopicImpl implements CompactedTopic {
     }
 
     @Override
-    public void newCompactedLedger(Position p, long compactedLedgerId) {
+    public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
         synchronized (this) {
             compactionHorizon = (PositionImpl)p;
+
+            CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
             compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
+
+            // delete the ledger from the old context once the new one is open
+            if (previousContext != null) {
+                return compactedTopicContext.thenCompose((res) -> previousContext)
+                    .thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
+            } else {
+                return compactedTopicContext;
+            }
         }
     }
 
@@ -182,6 +192,21 @@ public class CompactedTopicImpl implements CompactedTopic {
                                          ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
     }
 
+    private static CompletableFuture<Void> tryDeleteCompactedLedger(BookKeeper bk, long id) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        bk.asyncDeleteLedger(id,
+                             (rc, ctx) -> {
+                                 if (rc != BKException.Code.OK) {
+                                     log.warn("Error deleting compacted topic ledger {}",
+                                              id, BKException.create(rc));
+                                 } else {
+                                     log.debug("Compacted topic ledger deleted successfully");
+                                 }
+                                 promise.complete(null); // don't propagate any error
+                             }, null);
+        return promise;
+    }
+
     private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
         CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 04d4bbc..599889a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -18,49 +18,31 @@
  */
 package org.apache.pulsar.compaction;
 
-import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
-
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 
@@ -73,6 +55,7 @@ import org.testng.annotations.Test;
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicTest.class);
+    private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
 
     @BeforeMethod
     @Override
@@ -105,7 +88,6 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                                           Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
         List<Pair<MessageIdData,Long>> positions = new ArrayList<>();
         List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>();
-        ByteBuf emptyBuffer = Unpooled.buffer(0);
 
         AtomicLong ledgerIds = new AtomicLong(10L);
         AtomicLong entryIds = new AtomicLong(0L);
@@ -211,4 +193,47 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
                                 Long.valueOf(gap.getRight()));
         }
     }
+
+    @Test
+    public void testCleanupOldCompactedTopicLedger() throws Exception {
+        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null);
+
+        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        oldCompactedLedger.close();
+        LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        newCompactedLedger.close();
+
+        // set the compacted topic ledger
+        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
+        compactedTopic.newCompactedLedger(new PositionImpl(1,2), oldCompactedLedger.getId()).get();
+
+        // ensure both ledgers still exist, can be opened
+        bk.openLedger(oldCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+        bk.openLedger(newCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+
+        // update the compacted topic ledger
+        compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
+
+        // old ledger should be deleted, new still there
+        try {
+            bk.openLedger(oldCompactedLedger.getId(),
+                          Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                          Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+            Assert.fail("Should have failed to open old ledger");
+        } catch (BKException.BKNoSuchLedgerExistsException e) {
+            // correct, expected behaviour
+        }
+        bk.openLedger(newCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.