You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/28 17:09:58 UTC

[GitHub] merlimat closed pull request #1302: Cleanup old compacted topic ledgers when a new one is available

merlimat closed pull request #1302: Cleanup old compacted topic ledgers when a new one is available
URL: https://github.com/apache/incubator-pulsar/pull/1302
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 65e78a923..d95f0c9e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 
 public interface CompactedTopic {
-    void newCompactedLedger(Position p, long compactedLedgerId);
+    CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
     void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
                                 ReadEntriesCallback callback, Object ctx);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index f36e11f33..e70381edb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -61,10 +61,20 @@ public CompactedTopicImpl(BookKeeper bk) {
     }
 
     @Override
-    public void newCompactedLedger(Position p, long compactedLedgerId) {
+    public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
         synchronized (this) {
             compactionHorizon = (PositionImpl)p;
+
+            CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
             compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
+
+            // delete the ledger from the old context once the new one is open
+            if (previousContext != null) {
+                return compactedTopicContext.thenCompose((res) -> previousContext)
+                    .thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
+            } else {
+                return compactedTopicContext;
+            }
         }
     }
 
@@ -182,6 +192,21 @@ private static void findStartPointLoop(PositionImpl p, long start, long end,
                                          ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
     }
 
+    private static CompletableFuture<Void> tryDeleteCompactedLedger(BookKeeper bk, long id) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        bk.asyncDeleteLedger(id,
+                             (rc, ctx) -> {
+                                 if (rc != BKException.Code.OK) {
+                                     log.warn("Error deleting compacted topic ledger {}",
+                                              id, BKException.create(rc));
+                                 } else {
+                                     log.debug("Compacted topic ledger deleted successfully");
+                                 }
+                                 promise.complete(null); // don't propagate any error
+                             }, null);
+        return promise;
+    }
+
     private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
         CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 04d4bbc7c..599889ace 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -18,49 +18,31 @@
  */
 package org.apache.pulsar.compaction;
 
-import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
-
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 
@@ -73,6 +55,7 @@
 
 public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
     private static final Logger log = LoggerFactory.getLogger(CompactedTopicTest.class);
+    private static final ByteBuf emptyBuffer = Unpooled.buffer(0);
 
     @BeforeMethod
     @Override
@@ -105,7 +88,6 @@ public void cleanup() throws Exception {
                                           Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
         List<Pair<MessageIdData,Long>> positions = new ArrayList<>();
         List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>();
-        ByteBuf emptyBuffer = Unpooled.buffer(0);
 
         AtomicLong ledgerIds = new AtomicLong(10L);
         AtomicLong entryIds = new AtomicLong(0L);
@@ -211,4 +193,47 @@ public void testEntryLookup() throws Exception {
                                 Long.valueOf(gap.getRight()));
         }
     }
+
+    @Test
+    public void testCleanupOldCompactedTopicLedger() throws Exception {
+        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null);
+
+        LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        oldCompactedLedger.close();
+        LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
+                Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
+        newCompactedLedger.close();
+
+        // set the compacted topic ledger
+        CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
+        compactedTopic.newCompactedLedger(new PositionImpl(1,2), oldCompactedLedger.getId()).get();
+
+        // ensure both ledgers still exist, can be opened
+        bk.openLedger(oldCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+        bk.openLedger(newCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+
+        // update the compacted topic ledger
+        compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();
+
+        // old ledger should be deleted, new still there
+        try {
+            bk.openLedger(oldCompactedLedger.getId(),
+                          Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                          Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+            Assert.fail("Should have failed to open old ledger");
+        } catch (BKException.BKNoSuchLedgerExistsException e) {
+            // correct, expected behaviour
+        }
+        bk.openLedger(newCompactedLedger.getId(),
+                      Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                      Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services