You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/07/17 07:36:37 UTC
[bookkeeper] branch master updated: ISSUE #1534: LedgerCache should
be flushed
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2e731 ISSUE #1534: LedgerCache should be flushed
7a2e731 is described below
commit 7a2e73177d3147221a0992455a9b73049d9a5447
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jul 17 00:36:30 2018 -0700
ISSUE #1534: LedgerCache should be flushed
Descriptions of the changes in this PR:
### Motivation
EntryLogComparator.CompactionScannerFactory.flush just calls "ledgerStorage.updateEntriesLocations(offsets);" but not "ledgerStorage.flushEntriesLocationsIndex()".
Because of this, EntryLogCompactor.compact method would remove compacted entryLog without updated offsets/locations getting flushed/persisted/fsynced to LedgerCache (Index/FileInfo files). This could lead to data corruption/loss if Bookie is broughtdown/killed before those updated offsets/locations are flushed/persisted/fsynced.
### Changes
In EntryLogComparator, before removing compacted entryLog, LedgerCache (IndexInMemPageMgr and Index files) should be flushed, just like EntryLogger.
Master Issue: #1534
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
This closes #1536 from reddycharan/compactfix, closes #1534
---
.../bookkeeper/bookie/EntryLogCompactor.java | 1 +
.../apache/bookkeeper/bookie/CompactionTest.java | 93 +++++++++++++++++++++-
2 files changed, 91 insertions(+), 3 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index 88e22bf..a5e2c3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -114,6 +114,7 @@ public class EntryLogCompactor extends AbstractLogCompactor {
try {
entryLogger.flush();
ledgerStorage.updateEntriesLocations(offsets);
+ ledgerStorage.flushEntriesLocationsIndex();
} finally {
offsets.clear();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 7c5ac7b..447fcc3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -62,6 +63,7 @@ import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
@@ -88,6 +90,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
private final boolean isThrottleByBytes;
private final DigestType digestType;
+ private final byte[] passwdBytes;
private final int numEntries;
private final int gcWaitTime;
private final double minorCompactionThreshold;
@@ -101,7 +104,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
this.isThrottleByBytes = isByBytes;
this.digestType = DigestType.CRC32;
-
+ this.passwdBytes = "".getBytes();
numEntries = 100;
gcWaitTime = 1000;
minorCompactionThreshold = 0.1f;
@@ -156,7 +159,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
LedgerHandle[] lhs = new LedgerHandle[3];
for (int i = 0; i < 3; ++i) {
- lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, "".getBytes());
+ lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, passwdBytes);
}
for (int n = 0; n < numEntryLogs; n++) {
@@ -179,7 +182,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
}
private void verifyLedger(long lid, long startEntryId, long endEntryId) throws Exception {
- LedgerHandle lh = bkc.openLedger(lid, digestType, "".getBytes());
+ LedgerHandle lh = bkc.openLedger(lid, digestType, passwdBytes);
Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
@@ -540,6 +543,90 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
}
@Test
+ public void testCompactionPersistence() throws Exception {
+ /*
+ * for this test scenario we are assuming that there will be only one
+ * bookie in the cluster
+ */
+ assertEquals("Numbers of Bookies in this cluster", 1, numBookies);
+ /*
+ * this test is for validating EntryLogCompactor, so make sure
+ * TransactionalCompaction is not enabled.
+ */
+ assertFalse("Bookies must be using EntryLogCompactor", baseConf.getUseTransactionalCompaction());
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, true);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ // disable minor compaction
+ baseConf.setMinorCompactionThreshold(0.0f);
+ baseConf.setGcWaitTime(60000);
+ baseConf.setMinorCompactionInterval(120000);
+ baseConf.setMajorCompactionInterval(240000);
+
+ // restart bookies
+ restartBookies(baseConf);
+
+ long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+ long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+ assertTrue(getGCThread().enableMajorCompaction);
+ assertFalse(getGCThread().enableMinorCompaction);
+
+ // remove ledger1 and ledger3
+ bkc.deleteLedger(lhs[0].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ getGCThread().enableForceGC();
+ getGCThread().triggerGC().get();
+
+ // after garbage collection, minor compaction should not be executed
+ assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+ assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
+
+ // entry logs ([0,1,2].log) should be compacted
+ for (File ledgerDirectory : tmpDirs) {
+ assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+ + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
+ }
+
+ // even entry log files are removed, we still can access entries for
+ // ledger2
+ // since those entries has been compacted to new entry log
+ long ledgerId = lhs[1].getId();
+ long lastAddConfirmed = lhs[1].getLastAddConfirmed();
+ verifyLedger(ledgerId, 0, lastAddConfirmed);
+
+ /*
+ * there is only one bookie in the cluster so we should be able to read
+ * entries from this bookie.
+ */
+ ServerConfiguration bookieServerConfig = bs.get(0).getBookie().conf;
+ ServerConfiguration newBookieConf = new ServerConfiguration(bookieServerConfig);
+ /*
+ * by reusing bookieServerConfig and setting metadataServiceUri to null
+ * we can create/start new Bookie instance using the same data
+ * (journal/ledger/index) of the existing BookeieServer for our testing
+ * purpose.
+ */
+ newBookieConf.setMetadataServiceUri(null);
+ Bookie newbookie = new Bookie(newBookieConf);
+
+ DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
+ BookKeeper.DigestType.toProtoDigestType(digestType), baseClientConf.getUseV2WireProtocol());
+
+ for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) {
+ ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId);
+ ByteBuf readEntryBuf = digestManager.verifyDigestAndReturnData(entryId, readEntryBufWithChecksum);
+ byte[] readEntryBytes = new byte[readEntryBuf.readableBytes()];
+ readEntryBuf.readBytes(readEntryBytes);
+ assertEquals(msg, new String(readEntryBytes));
+ }
+ }
+
+ @Test
public void testMajorCompactionAboveThreshold() throws Exception {
// prepare data
LedgerHandle[] lhs = prepareData(3, false);