You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/05/17 17:38:49 UTC

[bookkeeper] branch master updated: Update and flush lastLogMark when replaying journal

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 36be836  Update and flush lastLogMark when replaying journal
36be836 is described below

commit 36be8362399341022c8de64f9319270726df2cb3
Author: karanmehta93 <k....@salesforce.com>
AuthorDate: Fri May 17 10:38:43 2019 -0700

    Update and flush lastLogMark when replaying journal
    
    Descriptions of the changes in this PR:
    
    Update `lastLogMark` in memory after replaying each journal
    Check for writable ledger dirs with `minUsableSizeForEntryLogCreation` to flush the `lastMark` file for bookies in ReadOnlyMode
    Log line changes
    
    ### Motivation
    
    Master Issue: #2087
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Charan Reddy Guttapalem <re...@gmail.com>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #2090 from karanmehta93/master and squashes the following commits:
    
    f802cf6da [Karan Mehta] Fixed small issue in constuctor chaining in BookKeeperClusterTestCase
    407e9f1ff [Karan Mehta] Moved lastLogMark updation logic to Journal.java
    585313196 [Karan Mehta] Fix checkstyle errors
    c3d47014a [Karan Mehta] Fix testJournalLogAddEntryCalledCorrectly test
    c72b7b55e [Karan Mehta] Addressed nit
    5d238f2b7 [Karan Mehta] Refactored code to update lastLogMark only when replaying journal and addressed nits
    b5515697f [Karan Mehta] Issue #2087 Update and flush lastLogMark when replaying journal
    d35aa22ad [Charan Reddy Guttapalem] Move common placementpolicy components to TopologyAwareEnsemblePlacementPolicy.
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  41 ++++++-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  56 +++-------
 .../bookie/BookieInitializationTest.java           | 122 +++++++++++++++++++++
 .../bookkeeper/bookie/BookieJournalTest.java       |  27 +++--
 .../bookie/BookieWriteToJournalTest.java           |  16 ++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   2 +-
 .../java/org/apache/bookkeeper/util/TestUtils.java |  12 ++
 7 files changed, 220 insertions(+), 56 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 32adb5d..467cb03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -883,12 +883,51 @@ public class Bookie extends BookieCriticalThread {
         };
 
         for (Journal journal : journals) {
-            journal.replay(scanner);
+            replay(journal, scanner);
         }
         long elapsedTs = System.currentTimeMillis() - startTs;
         LOG.info("Finished replaying journal in {} ms.", elapsedTs);
     }
 
+    /**
+     * Replay journal files and updates journal's in-memory lastLogMark object.
+     *
+     * @param journal Journal object corresponding to a journalDir
+     * @param scanner Scanner to process replayed entries.
+     * @throws IOException
+     */
+    private void replay(Journal journal, JournalScanner scanner) throws IOException {
+        final LogMark markedLog = journal.getLastLogMark().getCurMark();
+        List<Long> logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId -> {
+            if (journalId < markedLog.getLogFileId()) {
+                return false;
+            }
+            return true;
+        });
+        // last log mark may be missed due to no sync up before
+        // validate filtered log ids only when we have markedLogId
+        if (markedLog.getLogFileId() > 0) {
+            if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
+                throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
+            }
+        }
+
+        // TODO: When reading in the journal logs that need to be synced, we
+        // should use BufferedChannels instead to minimize the amount of
+        // system calls done.
+        for (Long id : logs) {
+            long logPosition = 0L;
+            if (id == markedLog.getLogFileId()) {
+                logPosition = markedLog.getLogFileOffset();
+            }
+            LOG.info("Replaying journal {} from position {}", id, logPosition);
+            journal.scanJournal(id, logPosition, scanner);
+            // Update LastLogMark to Long.MAX_VALUE position after replaying journal
+            // After LedgerStorage flush, SyncThread should persist this to disk
+            journal.setLastLogMarkToEof(id);
+        }
+    }
+
     @Override
     public synchronized void start() {
         setDaemon(true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 0a78fca..8941b68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -78,7 +78,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     /**
      * Filter to pickup journals.
      */
-    private interface JournalIdFilter {
+    public interface JournalIdFilter {
         boolean accept(long journalId);
     }
 
@@ -196,11 +196,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             // which is safe since records before lastMark have been
             // persisted to disk (both index & entry logger)
             lastMark.getCurMark().writeLogMark(bb);
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
             }
+
             List<File> writableLedgerDirs = ledgerDirsManager
-                    .getWritableLedgerDirs();
+                    .getWritableLedgerDirsForNewLog();
             for (File dir : writableLedgerDirs) {
                 File file = new File(dir, lastMarkFileName);
                 FileOutputStream fos = null;
@@ -708,6 +710,15 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     }
 
     /**
+     * Update lastLogMark of the journal,
+     * Indicates that the file has been processed.
+     * @param id
+     */
+    void setLastLogMarkToEof(Long id) {
+        lastLogMark.getCurMark().setLogMark(id, Long.MAX_VALUE);
+    }
+
+    /**
      * Application tried to schedule a checkpoint. After all the txns added
      * before checkpoint are persisted, a <i>checkpoint</i> will be returned
      * to application. Application could use <i>checkpoint</i> to do its logic.
@@ -826,47 +837,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         }
     }
 
-    /**
-     * Replay journal files.
-     *
-     * @param scanner Scanner to process replayed entries.
-     * @throws IOException
-     */
-    public void replay(JournalScanner scanner) throws IOException {
-        final LogMark markedLog = lastLogMark.getCurMark();
-        List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
-            @Override
-            public boolean accept(long journalId) {
-                if (journalId < markedLog.getLogFileId()) {
-                    return false;
-                }
-                return true;
-            }
-        });
-        // last log mark may be missed due to no sync up before
-        // validate filtered log ids only when we have markedLogId
-        if (markedLog.getLogFileId() > 0) {
-            if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
-                throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
-            }
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Try to relay journal logs : {}", logs);
-        }
-        // TODO: When reading in the journal logs that need to be synced, we
-        // should use BufferedChannels instead to minimize the amount of
-        // system calls done.
-        for (Long id: logs) {
-            long logPosition = 0L;
-            if (id == markedLog.getLogFileId()) {
-                logPosition = markedLog.getLogFileOffset();
-            }
-            LOG.info("Replaying journal {} from position {}", id, logPosition);
-            scanJournal(id, logPosition, scanner);
-        }
-    }
-
     public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
             throws InterruptedException {
         logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index c4ed031..dbaef35 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.BOOKIE_STATUS_FILENAME;
+import static org.apache.bookkeeper.util.TestUtils.countNumOfFiles;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -59,6 +60,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
 import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -136,6 +138,126 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         super.tearDown();
     }
 
+    @Test
+    public void testOneJournalReplayForBookieRestartInReadOnlyMode() throws Exception {
+        testJournalReplayForBookieRestartInReadOnlyMode(1);
+    }
+
+    @Test
+    public void testMultipleJournalReplayForBookieRestartInReadOnlyMode() throws Exception {
+        testJournalReplayForBookieRestartInReadOnlyMode(4);
+    }
+
+    /**
+     * Tests that journal replay works correctly when bookie crashes and starts up in RO mode.
+     */
+    private void testJournalReplayForBookieRestartInReadOnlyMode(int numOfJournalDirs) throws Exception {
+        File tmpLedgerDir = createTempDir("DiskCheck", "test");
+        File tmpJournalDir = createTempDir("DiskCheck", "test");
+
+        String[] journalDirs = new String[numOfJournalDirs];
+        for (int i = 0; i < numOfJournalDirs; i++) {
+            journalDirs[i] = tmpJournalDir.getAbsolutePath() + "/journal-" + i;
+        }
+
+        final ServerConfiguration conf = newServerConfiguration()
+                .setJournalDirsName(journalDirs)
+                .setLedgerDirNames(new String[] { tmpLedgerDir.getPath() })
+                .setDiskCheckInterval(1000)
+                .setLedgerStorageClass(SortedLedgerStorage.class.getName())
+                .setAutoRecoveryDaemonEnabled(false)
+                .setZkTimeout(5000);
+
+        BookieServer server = new MockBookieServer(conf);
+        server.start();
+
+        List<LastLogMark> lastLogMarkList = new ArrayList<>(journalDirs.length);
+
+        for (int i = 0; i < journalDirs.length; i++) {
+            Journal journal = server.getBookie().journals.get(i);
+            // LastLogMark should be (0, 0) at the bookie clean start
+            journal.getLastLogMark().readLog();
+            lastLogMarkList.add(journal.getLastLogMark().markLog());
+            assertEquals(0, lastLogMarkList.get(i).getCurMark().compare(new LogMark(0, 0)));
+        }
+
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setMetadataServiceUri(metadataServiceUri);
+        BookKeeper bkClient = new BookKeeper(clientConf);
+
+        // Create multiple ledgers for adding entries to multiple journals
+        for (int i = 0; i < journalDirs.length; i++) {
+            LedgerHandle lh = bkClient.createLedger(1, 1, 1, DigestType.CRC32, "passwd".getBytes());
+            long entryId = -1;
+            // Ensure that we have non-zero number of entries
+            long numOfEntries = new Random().nextInt(10) + 3;
+            for (int j = 0; j < numOfEntries; j++) {
+                entryId = lh.addEntry("data".getBytes());
+            }
+            assertEquals(entryId, (numOfEntries - 1));
+            lh.close();
+        }
+
+        for (int i = 0; i < journalDirs.length; i++) {
+            Journal journal = server.getBookie().journals.get(i);
+            // In-memory LastLogMark should be updated with every write to journal
+            assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMarkList.get(i).getCurMark()) > 0);
+            lastLogMarkList.set(i, journal.getLastLogMark().markLog());
+        }
+
+        // Kill Bookie abruptly before entries are flushed to disk
+        server.shutdown();
+
+        conf.setDiskUsageThreshold(0.001f)
+                .setDiskUsageWarnThreshold(0.0f).setReadOnlyModeEnabled(true).setIsForceGCAllowWhenNoSpace(true)
+                .setMinUsableSizeForIndexFileCreation(5 * 1024);
+
+        server = new BookieServer(conf);
+
+        for (int i = 0; i < journalDirs.length; i++) {
+            Journal journal = server.getBookie().journals.get(i);
+            // LastLogMark should be (0, 0) before bookie restart since bookie crashed before persisting lastMark
+            assertEquals(0, journal.getLastLogMark().getCurMark().compare(new LogMark(0, 0)));
+        }
+
+        int numOfRestarts = 3;
+        // Restart server multiple times to ensure that logs are never replayed and new files are not generated
+        for (int i = 0; i < numOfRestarts; i++) {
+
+            int txnBefore = countNumOfFiles(conf.getJournalDirs(), "txn");
+            int logBefore = countNumOfFiles(conf.getLedgerDirs(), "log");
+            int idxBefore = countNumOfFiles(conf.getLedgerDirs(), "idx");
+
+            server.start();
+
+            for (int j = 0; j < journalDirs.length; j++) {
+                Journal journal = server.getBookie().journals.get(j);
+                assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMarkList.get(j).getCurMark()) > 0);
+                lastLogMarkList.set(j, journal.getLastLogMark().markLog());
+            }
+
+            server.shutdown();
+
+            // Every bookie restart initiates a new journal file
+            // Journals should not be replayed everytime since lastMark gets updated everytime
+            // New EntryLog files should not be generated.
+            assertEquals(journalDirs.length, (countNumOfFiles(conf.getJournalDirs(), "txn") - txnBefore));
+
+            // First restart should replay journal and generate new log/index files
+            // Subsequent runs should not generate new files (but can delete older ones)
+            if (i == 0) {
+                assertTrue((countNumOfFiles(conf.getLedgerDirs(), "log") - logBefore) > 0);
+                assertTrue((countNumOfFiles(conf.getLedgerDirs(), "idx") - idxBefore) > 0);
+            } else {
+                assertTrue((countNumOfFiles(conf.getLedgerDirs(), "log") - logBefore) <= 0);
+                assertTrue((countNumOfFiles(conf.getLedgerDirs(), "idx") - idxBefore) <= 0);
+            }
+
+            server = new BookieServer(conf);
+        }
+        bkClient.close();
+    }
+
     /**
      * Verify the bookie server exit code. On ZooKeeper exception, should return
      * exit code ZK_REG_FAIL = 4
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 080ebfe..0f1b327 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -350,8 +351,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setMetadataServiceUri(null);
 
-        Bookie b = new Bookie(conf);
-        b.readJournal();
+        Bookie b = createBookieAndReadJournal(conf);
 
         b.readEntry(1, 100);
         try {
@@ -379,8 +379,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setMetadataServiceUri(null);
 
-        Bookie b = new Bookie(conf);
-        b.readJournal();
+        Bookie b = createBookieAndReadJournal(conf);
 
         b.readEntry(1, 100);
         try {
@@ -410,8 +409,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setMetadataServiceUri(null);
 
-        Bookie b = new Bookie(conf);
-        b.readJournal();
+        Bookie b = createBookieAndReadJournal(conf);
 
         for (int i = 1; i <= 2 * JournalChannel.SECTOR_SIZE; i++) {
             b.readEntry(1, i);
@@ -570,8 +568,7 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setMetadataServiceUri(null);
 
-        Bookie b = new Bookie(conf);
-        b.readJournal();
+        Bookie b = createBookieAndReadJournal(conf);
 
         b.readEntry(1, 99);
 
@@ -614,8 +611,8 @@ public class BookieJournalTest {
             .setLedgerDirNames(new String[] { ledgerDir.getPath() })
             .setMetadataServiceUri(null);
 
-        Bookie b = new Bookie(conf);
-        b.readJournal();
+        Bookie b = createBookieAndReadJournal(conf);
+
         b.readEntry(1, 99);
 
         // still able to read last entry, but it's junk
@@ -640,6 +637,16 @@ public class BookieJournalTest {
         }
     }
 
+    private Bookie createBookieAndReadJournal(ServerConfiguration conf) throws Exception {
+        Bookie b = new Bookie(conf);
+        for (Journal journal : b.journals) {
+            LastLogMark lastLogMark = journal.getLastLogMark().markLog();
+            b.readJournal();
+            assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMark.getCurMark()) > 0);
+        }
+        return b;
+    }
+
     /**
      * Test journal replay with SortedLedgerStorage and a very small max
      * arena size.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index 2d197d3..27a5d28 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -33,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -63,6 +64,19 @@ public class BookieWriteToJournalTest {
     @Rule
     public TemporaryFolder tempDir = new TemporaryFolder();
 
+    class NoOpJournalReplayBookie extends Bookie {
+
+        public NoOpJournalReplayBookie(ServerConfiguration conf)
+                throws IOException, InterruptedException, BookieException {
+            super(conf);
+        }
+
+        @Override
+        void readJournal() throws IOException, BookieException {
+            // Should be no-op since journal objects are mocked
+        }
+    }
+
     /**
      * test that Bookie calls correctly Journal.logAddEntry about "ackBeforeSync" parameter.
      */
@@ -102,7 +116,7 @@ public class BookieWriteToJournalTest {
 
         whenNew(Journal.class).withAnyArguments().thenReturn(journal);
 
-        Bookie b = new Bookie(conf);
+        Bookie b = new NoOpJournalReplayBookie(conf);
         b.start();
 
         long ledgerId = 1;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 97b7488..b8e2426 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -123,7 +123,7 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     public BookKeeperClusterTestCase(int numBookies, int testTimeoutSecs) {
-        this(numBookies, 1, 120);
+        this(numBookies, 1, testTimeoutSecs);
     }
 
     public BookKeeperClusterTestCase(int numBookies, int numOfZKNodes, int testTimeoutSecs) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 3525607..1dc30d4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.util;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 
+import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 
 /**
@@ -92,4 +94,14 @@ public final class TestUtils {
         }
         Assert.assertTrue(description, predicate.getAsBoolean());
     }
+
+    public static int countNumOfFiles(File[] folderNames, String... extensions) {
+        int count = 0;
+        for (int i = 0; i < folderNames.length; i++) {
+            Collection<File> filesCollection = FileUtils.listFiles(folderNames[i], extensions, true);
+            count += filesCollection.size();
+        }
+        return count;
+    }
+
 }