You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/02 17:10:28 UTC
[incubator-pulsar] branch master updated: Don't offload empty
ledgers (#1687)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b982943 Don't offload empty ledgers (#1687)
b982943 is described below
commit b982943c77972211c937270d305d14f9d12d5f84
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed May 2 19:10:25 2018 +0200
Don't offload empty ledgers (#1687)
It shouldn't be possible for a ledger in a managed ledger to be
empty (it should be cleaned up on recovery), but this patch adds
defensive code so that if they do exist for some reason, they won't be
offloaded.
Master Issue: #1511
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +-
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 43 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f1d7100..1a606b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1966,7 +1966,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long firstLedgerRetained = current;
for (LedgerInfo ls : ledgers.headMap(current).values()) {
if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) {
- if (!ls.getOffloadContext().getComplete()) {
+ // don't offload if ledger has already been offloaded, or is empty
+ if (!ls.getOffloadContext().getComplete() && ls.getSize() > 0) {
ledgersToOffload.add(ls);
}
} else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 278182c..76cbbb6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
import com.google.common.collect.ImmutableSet;
+import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
@@ -633,6 +635,47 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger));
}
+ @Test
+ public void testDontOffloadEmpty() throws Exception {
+ MockLedgerOffloader offloader = new MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setLedgerOffloader(offloader);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+ int i = 0;
+ for (; i < 35; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+
+ long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+ long secondLedgerId = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+ long thirdLedgerId = ledger.getLedgersInfoAsList().get(2).getLedgerId();
+ long fourthLedgerId = ledger.getLedgersInfoAsList().get(3).getLedgerId();
+
+ // make an ledger empty
+ Field ledgersField = ledger.getClass().getDeclaredField("ledgers");
+ ledgersField.setAccessible(true);
+ Map<Long, LedgerInfo> ledgers = (Map<Long,LedgerInfo>)ledgersField.get(ledger);
+ ledgers.put(secondLedgerId,
+ ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build());
+
+ PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+ Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId);
+ Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4);
+ Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e -> e.getOffloadContext().getComplete())
+ .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+ offloader.offloadedLedgers());
+ Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId));
+ }
+
static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.