You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/05/17 17:38:49 UTC
[bookkeeper] branch master updated: Update and flush lastLogMark
when replaying journal
This is an automated email from the ASF dual-hosted git repository.
reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 36be836 Update and flush lastLogMark when replaying journal
36be836 is described below
commit 36be8362399341022c8de64f9319270726df2cb3
Author: karanmehta93 <k....@salesforce.com>
AuthorDate: Fri May 17 10:38:43 2019 -0700
Update and flush lastLogMark when replaying journal
Descriptions of the changes in this PR:
Update `lastLogMark` in memory after replaying each journal
Check for writable ledger dirs with `minUsableSizeForEntryLogCreation` to flush the `lastMark` file for bookies in ReadOnlyMode
Log line changes
### Motivation
Master Issue: #2087
Reviewers: Enrico Olivelli <eo...@gmail.com>, Charan Reddy Guttapalem <re...@gmail.com>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
This closes #2090 from karanmehta93/master and squashes the following commits:
f802cf6da [Karan Mehta] Fixed small issue in constuctor chaining in BookKeeperClusterTestCase
407e9f1ff [Karan Mehta] Moved lastLogMark updation logic to Journal.java
585313196 [Karan Mehta] Fix checkstyle errors
c3d47014a [Karan Mehta] Fix testJournalLogAddEntryCalledCorrectly test
c72b7b55e [Karan Mehta] Addressed nit
5d238f2b7 [Karan Mehta] Refactored code to update lastLogMark only when replaying journal and addressed nits
b5515697f [Karan Mehta] Issue #2087 Update and flush lastLogMark when replaying journal
d35aa22ad [Charan Reddy Guttapalem] Move common placementpolicy components to TopologyAwareEnsemblePlacementPolicy.
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 41 ++++++-
.../java/org/apache/bookkeeper/bookie/Journal.java | 56 +++-------
.../bookie/BookieInitializationTest.java | 122 +++++++++++++++++++++
.../bookkeeper/bookie/BookieJournalTest.java | 27 +++--
.../bookie/BookieWriteToJournalTest.java | 16 ++-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 2 +-
.../java/org/apache/bookkeeper/util/TestUtils.java | 12 ++
7 files changed, 220 insertions(+), 56 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 32adb5d..467cb03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -883,12 +883,51 @@ public class Bookie extends BookieCriticalThread {
};
for (Journal journal : journals) {
- journal.replay(scanner);
+ replay(journal, scanner);
}
long elapsedTs = System.currentTimeMillis() - startTs;
LOG.info("Finished replaying journal in {} ms.", elapsedTs);
}
+ /**
+ * Replay journal files and updates journal's in-memory lastLogMark object.
+ *
+ * @param journal Journal object corresponding to a journalDir
+ * @param scanner Scanner to process replayed entries.
+ * @throws IOException
+ */
+ private void replay(Journal journal, JournalScanner scanner) throws IOException {
+ final LogMark markedLog = journal.getLastLogMark().getCurMark();
+ List<Long> logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId -> {
+ if (journalId < markedLog.getLogFileId()) {
+ return false;
+ }
+ return true;
+ });
+ // last log mark may be missed due to no sync up before
+ // validate filtered log ids only when we have markedLogId
+ if (markedLog.getLogFileId() > 0) {
+ if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
+ throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
+ }
+ }
+
+ // TODO: When reading in the journal logs that need to be synced, we
+ // should use BufferedChannels instead to minimize the amount of
+ // system calls done.
+ for (Long id : logs) {
+ long logPosition = 0L;
+ if (id == markedLog.getLogFileId()) {
+ logPosition = markedLog.getLogFileOffset();
+ }
+ LOG.info("Replaying journal {} from position {}", id, logPosition);
+ journal.scanJournal(id, logPosition, scanner);
+ // Update LastLogMark to Long.MAX_VALUE position after replaying journal
+ // After LedgerStorage flush, SyncThread should persist this to disk
+ journal.setLastLogMarkToEof(id);
+ }
+ }
+
@Override
public synchronized void start() {
setDaemon(true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 0a78fca..8941b68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -78,7 +78,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
/**
* Filter to pickup journals.
*/
- private interface JournalIdFilter {
+ public interface JournalIdFilter {
boolean accept(long journalId);
}
@@ -196,11 +196,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
// which is safe since records before lastMark have been
// persisted to disk (both index & entry logger)
lastMark.getCurMark().writeLogMark(bb);
+
if (LOG.isDebugEnabled()) {
LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark());
}
+
List<File> writableLedgerDirs = ledgerDirsManager
- .getWritableLedgerDirs();
+ .getWritableLedgerDirsForNewLog();
for (File dir : writableLedgerDirs) {
File file = new File(dir, lastMarkFileName);
FileOutputStream fos = null;
@@ -708,6 +710,15 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
/**
+ * Update lastLogMark of the journal,
+ * Indicates that the file has been processed.
+ * @param id
+ */
+ void setLastLogMarkToEof(Long id) {
+ lastLogMark.getCurMark().setLogMark(id, Long.MAX_VALUE);
+ }
+
+ /**
* Application tried to schedule a checkpoint. After all the txns added
* before checkpoint are persisted, a <i>checkpoint</i> will be returned
* to application. Application could use <i>checkpoint</i> to do its logic.
@@ -826,47 +837,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
}
- /**
- * Replay journal files.
- *
- * @param scanner Scanner to process replayed entries.
- * @throws IOException
- */
- public void replay(JournalScanner scanner) throws IOException {
- final LogMark markedLog = lastLogMark.getCurMark();
- List<Long> logs = listJournalIds(journalDirectory, new JournalIdFilter() {
- @Override
- public boolean accept(long journalId) {
- if (journalId < markedLog.getLogFileId()) {
- return false;
- }
- return true;
- }
- });
- // last log mark may be missed due to no sync up before
- // validate filtered log ids only when we have markedLogId
- if (markedLog.getLogFileId() > 0) {
- if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
- throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try to relay journal logs : {}", logs);
- }
- // TODO: When reading in the journal logs that need to be synced, we
- // should use BufferedChannels instead to minimize the amount of
- // system calls done.
- for (Long id: logs) {
- long logPosition = 0L;
- if (id == markedLog.getLogFileId()) {
- logPosition = markedLog.getLogFileOffset();
- }
- LOG.info("Replaying journal {} from position {}", id, logPosition);
- scanJournal(id, logPosition, scanner);
- }
- }
-
public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index c4ed031..dbaef35 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.BOOKIE_STATUS_FILENAME;
+import static org.apache.bookkeeper.util.TestUtils.countNumOfFiles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -59,6 +60,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -136,6 +138,126 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
super.tearDown();
}
+ @Test
+ public void testOneJournalReplayForBookieRestartInReadOnlyMode() throws Exception {
+ testJournalReplayForBookieRestartInReadOnlyMode(1);
+ }
+
+ @Test
+ public void testMultipleJournalReplayForBookieRestartInReadOnlyMode() throws Exception {
+ testJournalReplayForBookieRestartInReadOnlyMode(4);
+ }
+
+ /**
+ * Tests that journal replay works correctly when bookie crashes and starts up in RO mode.
+ */
+ private void testJournalReplayForBookieRestartInReadOnlyMode(int numOfJournalDirs) throws Exception {
+ File tmpLedgerDir = createTempDir("DiskCheck", "test");
+ File tmpJournalDir = createTempDir("DiskCheck", "test");
+
+ String[] journalDirs = new String[numOfJournalDirs];
+ for (int i = 0; i < numOfJournalDirs; i++) {
+ journalDirs[i] = tmpJournalDir.getAbsolutePath() + "/journal-" + i;
+ }
+
+ final ServerConfiguration conf = newServerConfiguration()
+ .setJournalDirsName(journalDirs)
+ .setLedgerDirNames(new String[] { tmpLedgerDir.getPath() })
+ .setDiskCheckInterval(1000)
+ .setLedgerStorageClass(SortedLedgerStorage.class.getName())
+ .setAutoRecoveryDaemonEnabled(false)
+ .setZkTimeout(5000);
+
+ BookieServer server = new MockBookieServer(conf);
+ server.start();
+
+ List<LastLogMark> lastLogMarkList = new ArrayList<>(journalDirs.length);
+
+ for (int i = 0; i < journalDirs.length; i++) {
+ Journal journal = server.getBookie().journals.get(i);
+ // LastLogMark should be (0, 0) at the bookie clean start
+ journal.getLastLogMark().readLog();
+ lastLogMarkList.add(journal.getLastLogMark().markLog());
+ assertEquals(0, lastLogMarkList.get(i).getCurMark().compare(new LogMark(0, 0)));
+ }
+
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.setMetadataServiceUri(metadataServiceUri);
+ BookKeeper bkClient = new BookKeeper(clientConf);
+
+ // Create multiple ledgers for adding entries to multiple journals
+ for (int i = 0; i < journalDirs.length; i++) {
+ LedgerHandle lh = bkClient.createLedger(1, 1, 1, DigestType.CRC32, "passwd".getBytes());
+ long entryId = -1;
+ // Ensure that we have non-zero number of entries
+ long numOfEntries = new Random().nextInt(10) + 3;
+ for (int j = 0; j < numOfEntries; j++) {
+ entryId = lh.addEntry("data".getBytes());
+ }
+ assertEquals(entryId, (numOfEntries - 1));
+ lh.close();
+ }
+
+ for (int i = 0; i < journalDirs.length; i++) {
+ Journal journal = server.getBookie().journals.get(i);
+ // In-memory LastLogMark should be updated with every write to journal
+ assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMarkList.get(i).getCurMark()) > 0);
+ lastLogMarkList.set(i, journal.getLastLogMark().markLog());
+ }
+
+ // Kill Bookie abruptly before entries are flushed to disk
+ server.shutdown();
+
+ conf.setDiskUsageThreshold(0.001f)
+ .setDiskUsageWarnThreshold(0.0f).setReadOnlyModeEnabled(true).setIsForceGCAllowWhenNoSpace(true)
+ .setMinUsableSizeForIndexFileCreation(5 * 1024);
+
+ server = new BookieServer(conf);
+
+ for (int i = 0; i < journalDirs.length; i++) {
+ Journal journal = server.getBookie().journals.get(i);
+ // LastLogMark should be (0, 0) before bookie restart since bookie crashed before persisting lastMark
+ assertEquals(0, journal.getLastLogMark().getCurMark().compare(new LogMark(0, 0)));
+ }
+
+ int numOfRestarts = 3;
+ // Restart server multiple times to ensure that logs are never replayed and new files are not generated
+ for (int i = 0; i < numOfRestarts; i++) {
+
+ int txnBefore = countNumOfFiles(conf.getJournalDirs(), "txn");
+ int logBefore = countNumOfFiles(conf.getLedgerDirs(), "log");
+ int idxBefore = countNumOfFiles(conf.getLedgerDirs(), "idx");
+
+ server.start();
+
+ for (int j = 0; j < journalDirs.length; j++) {
+ Journal journal = server.getBookie().journals.get(j);
+ assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMarkList.get(j).getCurMark()) > 0);
+ lastLogMarkList.set(j, journal.getLastLogMark().markLog());
+ }
+
+ server.shutdown();
+
+ // Every bookie restart initiates a new journal file
+ // Journals should not be replayed everytime since lastMark gets updated everytime
+ // New EntryLog files should not be generated.
+ assertEquals(journalDirs.length, (countNumOfFiles(conf.getJournalDirs(), "txn") - txnBefore));
+
+ // First restart should replay journal and generate new log/index files
+ // Subsequent runs should not generate new files (but can delete older ones)
+ if (i == 0) {
+ assertTrue((countNumOfFiles(conf.getLedgerDirs(), "log") - logBefore) > 0);
+ assertTrue((countNumOfFiles(conf.getLedgerDirs(), "idx") - idxBefore) > 0);
+ } else {
+ assertTrue((countNumOfFiles(conf.getLedgerDirs(), "log") - logBefore) <= 0);
+ assertTrue((countNumOfFiles(conf.getLedgerDirs(), "idx") - idxBefore) <= 0);
+ }
+
+ server = new BookieServer(conf);
+ }
+ bkClient.close();
+ }
+
/**
* Verify the bookie server exit code. On ZooKeeper exception, should return
* exit code ZK_REG_FAIL = 4
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 080ebfe..0f1b327 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.client.ClientUtil;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -350,8 +351,7 @@ public class BookieJournalTest {
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
- Bookie b = new Bookie(conf);
- b.readJournal();
+ Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 100);
try {
@@ -379,8 +379,7 @@ public class BookieJournalTest {
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
- Bookie b = new Bookie(conf);
- b.readJournal();
+ Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 100);
try {
@@ -410,8 +409,7 @@ public class BookieJournalTest {
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
- Bookie b = new Bookie(conf);
- b.readJournal();
+ Bookie b = createBookieAndReadJournal(conf);
for (int i = 1; i <= 2 * JournalChannel.SECTOR_SIZE; i++) {
b.readEntry(1, i);
@@ -570,8 +568,7 @@ public class BookieJournalTest {
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
- Bookie b = new Bookie(conf);
- b.readJournal();
+ Bookie b = createBookieAndReadJournal(conf);
b.readEntry(1, 99);
@@ -614,8 +611,8 @@ public class BookieJournalTest {
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);
- Bookie b = new Bookie(conf);
- b.readJournal();
+ Bookie b = createBookieAndReadJournal(conf);
+
b.readEntry(1, 99);
// still able to read last entry, but it's junk
@@ -640,6 +637,16 @@ public class BookieJournalTest {
}
}
+ private Bookie createBookieAndReadJournal(ServerConfiguration conf) throws Exception {
+ Bookie b = new Bookie(conf);
+ for (Journal journal : b.journals) {
+ LastLogMark lastLogMark = journal.getLastLogMark().markLog();
+ b.readJournal();
+ assertTrue(journal.getLastLogMark().getCurMark().compare(lastLogMark.getCurMark()) > 0);
+ }
+ return b;
+ }
+
/**
* Test journal replay with SortedLedgerStorage and a very small max
* arena size.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index 2d197d3..27a5d28 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -33,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -63,6 +64,19 @@ public class BookieWriteToJournalTest {
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
+ class NoOpJournalReplayBookie extends Bookie {
+
+ public NoOpJournalReplayBookie(ServerConfiguration conf)
+ throws IOException, InterruptedException, BookieException {
+ super(conf);
+ }
+
+ @Override
+ void readJournal() throws IOException, BookieException {
+ // Should be no-op since journal objects are mocked
+ }
+ }
+
/**
* test that Bookie calls correctly Journal.logAddEntry about "ackBeforeSync" parameter.
*/
@@ -102,7 +116,7 @@ public class BookieWriteToJournalTest {
whenNew(Journal.class).withAnyArguments().thenReturn(journal);
- Bookie b = new Bookie(conf);
+ Bookie b = new NoOpJournalReplayBookie(conf);
b.start();
long ledgerId = 1;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 97b7488..b8e2426 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -123,7 +123,7 @@ public abstract class BookKeeperClusterTestCase {
}
public BookKeeperClusterTestCase(int numBookies, int testTimeoutSecs) {
- this(numBookies, 1, 120);
+ this(numBookies, 1, testTimeoutSecs);
}
public BookKeeperClusterTestCase(int numBookies, int numOfZKNodes, int testTimeoutSecs) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 3525607..1dc30d4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.util;
import java.io.File;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.commons.io.FileUtils;
import org.junit.Assert;
/**
@@ -92,4 +94,14 @@ public final class TestUtils {
}
Assert.assertTrue(description, predicate.getAsBoolean());
}
+
+ public static int countNumOfFiles(File[] folderNames, String... extensions) {
+ int count = 0;
+ for (int i = 0; i < folderNames.length; i++) {
+ Collection<File> filesCollection = FileUtils.listFiles(folderNames[i], extensions, true);
+ count += filesCollection.size();
+ }
+ return count;
+ }
+
}