You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/07/17 07:36:37 UTC

[bookkeeper] branch master updated: ISSUE #1534: LedgerCache should be flushed

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a2e731  ISSUE #1534: LedgerCache should be flushed
7a2e731 is described below

commit 7a2e73177d3147221a0992455a9b73049d9a5447
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jul 17 00:36:30 2018 -0700

    ISSUE #1534: LedgerCache should be flushed
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    EntryLogComparator.CompactionScannerFactory.flush just calls "ledgerStorage.updateEntriesLocations(offsets);" but not "ledgerStorage.flushEntriesLocationsIndex()".
    
    Because of this, EntryLogCompactor.compact method would remove compacted entryLog without updated offsets/locations getting flushed/persisted/fsynced to LedgerCache (Index/FileInfo files). This could lead to data corruption/loss if Bookie is broughtdown/killed before those updated offsets/locations are flushed/persisted/fsynced.
    ### Changes
    
    In EntryLogComparator, before removing compacted entryLog, LedgerCache (IndexInMemPageMgr and Index files) should be flushed, just like EntryLogger.
    
    Master Issue: #1534
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1536 from reddycharan/compactfix, closes #1534
---
 .../bookkeeper/bookie/EntryLogCompactor.java       |  1 +
 .../apache/bookkeeper/bookie/CompactionTest.java   | 93 +++++++++++++++++++++-
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index 88e22bf..a5e2c3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -114,6 +114,7 @@ public class EntryLogCompactor extends AbstractLogCompactor {
             try {
                 entryLogger.flush();
                 ledgerStorage.updateEntriesLocations(offsets);
+                ledgerStorage.flushEntriesLocationsIndex();
             } finally {
                 offsets.clear();
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 7c5ac7b..447fcc3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -62,6 +63,7 @@ import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
@@ -88,6 +90,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
     private final boolean isThrottleByBytes;
     private final DigestType digestType;
+    private final byte[] passwdBytes;
     private final int numEntries;
     private final int gcWaitTime;
     private final double minorCompactionThreshold;
@@ -101,7 +104,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         this.isThrottleByBytes = isByBytes;
         this.digestType = DigestType.CRC32;
-
+        this.passwdBytes = "".getBytes();
         numEntries = 100;
         gcWaitTime = 1000;
         minorCompactionThreshold = 0.1f;
@@ -156,7 +159,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         LedgerHandle[] lhs = new LedgerHandle[3];
         for (int i = 0; i < 3; ++i) {
-            lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, "".getBytes());
+            lhs[i] = bkc.createLedger(NUM_BOOKIES, NUM_BOOKIES, digestType, passwdBytes);
         }
 
         for (int n = 0; n < numEntryLogs; n++) {
@@ -179,7 +182,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
     }
 
     private void verifyLedger(long lid, long startEntryId, long endEntryId) throws Exception {
-        LedgerHandle lh = bkc.openLedger(lid, digestType, "".getBytes());
+        LedgerHandle lh = bkc.openLedger(lid, digestType, passwdBytes);
         Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
         while (entries.hasMoreElements()) {
             LedgerEntry entry = entries.nextElement();
@@ -540,6 +543,90 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testCompactionPersistence() throws Exception {
+        /*
+         * for this test scenario we are assuming that there will be only one
+         * bookie in the cluster
+         */
+        assertEquals("Numbers of Bookies in this cluster", 1, numBookies);
+        /*
+         * this test is for validating EntryLogCompactor, so make sure
+         * TransactionalCompaction is not enabled.
+         */
+        assertFalse("Bookies must be using EntryLogCompactor", baseConf.getUseTransactionalCompaction());
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, true);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        // disable minor compaction
+        baseConf.setMinorCompactionThreshold(0.0f);
+        baseConf.setGcWaitTime(60000);
+        baseConf.setMinorCompactionInterval(120000);
+        baseConf.setMajorCompactionInterval(240000);
+
+        // restart bookies
+        restartBookies(baseConf);
+
+        long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+        long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+        assertTrue(getGCThread().enableMajorCompaction);
+        assertFalse(getGCThread().enableMinorCompaction);
+
+        // remove ledger1 and ledger3
+        bkc.deleteLedger(lhs[0].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        getGCThread().enableForceGC();
+        getGCThread().triggerGC().get();
+
+        // after garbage collection, minor compaction should not be executed
+        assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+        assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
+
+        // entry logs ([0,1,2].log) should be compacted
+        for (File ledgerDirectory : tmpDirs) {
+            assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+                    + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
+        }
+
+        // even entry log files are removed, we still can access entries for
+        // ledger2
+        // since those entries has been compacted to new entry log
+        long ledgerId = lhs[1].getId();
+        long lastAddConfirmed = lhs[1].getLastAddConfirmed();
+        verifyLedger(ledgerId, 0, lastAddConfirmed);
+
+        /*
+         * there is only one bookie in the cluster so we should be able to read
+         * entries from this bookie.
+         */
+        ServerConfiguration bookieServerConfig = bs.get(0).getBookie().conf;
+        ServerConfiguration newBookieConf = new ServerConfiguration(bookieServerConfig);
+        /*
+         * by reusing bookieServerConfig and setting metadataServiceUri to null
+         * we can create/start new Bookie instance using the same data
+         * (journal/ledger/index) of the existing BookeieServer for our testing
+         * purpose.
+         */
+        newBookieConf.setMetadataServiceUri(null);
+        Bookie newbookie = new Bookie(newBookieConf);
+
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
+                BookKeeper.DigestType.toProtoDigestType(digestType), baseClientConf.getUseV2WireProtocol());
+
+        for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) {
+            ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId);
+            ByteBuf readEntryBuf = digestManager.verifyDigestAndReturnData(entryId, readEntryBufWithChecksum);
+            byte[] readEntryBytes = new byte[readEntryBuf.readableBytes()];
+            readEntryBuf.readBytes(readEntryBytes);
+            assertEquals(msg, new String(readEntryBytes));
+        }
+    }
+
+    @Test
     public void testMajorCompactionAboveThreshold() throws Exception {
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);