You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/02 17:10:28 UTC

[incubator-pulsar] branch master updated: Don't offload empty ledgers (#1687)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b982943  Don't offload empty ledgers (#1687)
b982943 is described below

commit b982943c77972211c937270d305d14f9d12d5f84
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed May 2 19:10:25 2018 +0200

    Don't offload empty ledgers (#1687)
    
    It shouldn't be possible for a ledger in a managed ledger to be
    empty (it should be cleaned up on recovery), but this patch adds
    defensive code so that if they do exist for some reason, they won't be
    offloaded.
    
    Master Issue: #1511
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 43 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f1d7100..1a606b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1966,7 +1966,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             long firstLedgerRetained = current;
             for (LedgerInfo ls : ledgers.headMap(current).values()) {
                 if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
-                    if (!ls.getOffloadContext().getComplete()) {
+                    // don't offload if ledger has already been offloaded, or is empty
+                    if (!ls.getOffloadContext().getComplete() && ls.getSize() > 0) {
                         ledgersToOffload.add(ls);
                     }
                 } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 278182c..76cbbb6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.ImmutableSet;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -633,6 +635,47 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger));
     }
 
+    @Test
+    public void testDontOffloadEmpty() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        int i = 0;
+        for (; i < 35; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+
+        long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+        long secondLedgerId = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+        long thirdLedgerId = ledger.getLedgersInfoAsList().get(2).getLedgerId();
+        long fourthLedgerId = ledger.getLedgersInfoAsList().get(3).getLedgerId();
+
+        // make an ledger empty
+        Field ledgersField = ledger.getClass().getDeclaredField("ledgers");
+        ledgersField.setAccessible(true);
+        Map<Long, LedgerInfo> ledgers = (Map<Long,LedgerInfo>)ledgersField.get(ledger);
+        ledgers.put(secondLedgerId,
+                    ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build());
+
+        PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId);
+        Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> e.getOffloadContext().getComplete())
+                            .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+                            offloader.offloadedLedgers());
+        Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId));
+    }
+
     static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
         // wait up to 3 seconds
         for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.