You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/16 16:41:26 UTC

[GitHub] merlimat closed pull request #1254: Fixes for multiple journals recovery

merlimat closed pull request #1254: Fixes for multiple journals recovery
URL: https://github.com/apache/bookkeeper/pull/1254
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e6d090e63..c9b2c8ae9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -686,7 +686,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
         // instantiate the journals
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
-            journals.add(new Journal(journalDirectories.get(i),
+            journals.add(new Journal(i, journalDirectories.get(i),
                          conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)));
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d35684ecf..dcc474568 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2930,8 +2930,9 @@ protected void scanEntryLog(long logId, EntryLogScanner scanner) throws IOExcept
     private synchronized List<Journal> getJournals() throws IOException {
         if (null == journals) {
             journals = Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length);
+            int idx = 0;
             for (File journalDir : bkConf.getJournalDirs()) {
-                journals.add(new Journal(new File(journalDir, BookKeeperConstants.CURRENT_DIR), bkConf,
+                journals.add(new Journal(idx++, new File(journalDir, BookKeeperConstants.CURRENT_DIR), bkConf,
                     new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
                         new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()))));
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index d499ce357..180b2d703 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -85,7 +85,7 @@ private boolean containsIndexFiles(File dir, String name) {
 
             public boolean accept(File dir, String name) {
                 if (name.endsWith(".txn") || name.endsWith(".log")
-                    || name.equals("lastId") || name.equals("lastMark")) {
+                    || name.equals("lastId") || name.startsWith("lastMark")) {
                     return true;
                 }
                 if (containsIndexFiles(dir, name)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 5a816a016..e2213fb32 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -182,7 +182,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
             List<File> writableLedgerDirs = ledgerDirsManager
                     .getWritableLedgerDirs();
             for (File dir : writableLedgerDirs) {
-                File file = new File(dir, "lastMark");
+                File file = new File(dir, lastMarkFileName);
                 FileOutputStream fos = null;
                 try {
                     fos = new FileOutputStream(file);
@@ -211,7 +211,7 @@ void readLog() {
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
             for (File dir: ledgerDirsManager.getAllLedgerDirs()) {
-                File file = new File(dir, "lastMark");
+                File file = new File(dir, lastMarkFileName);
                 try {
                     try (FileInputStream fis = new FileInputStream(file)) {
                         int bytesRead = fis.read(buff);
@@ -576,6 +576,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
 
     private final LastLogMark lastLogMark = new LastLogMark(0, 0);
 
+    private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
+
+    private final String lastMarkFileName;
+
     /**
      * The thread pool used to handle callback.
      */
@@ -606,12 +610,13 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
     private final Counter flushEmptyQueueCounter;
     private final Counter journalWriteBytes;
 
-    public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) {
-        this(journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+    public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager) {
+        this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
     }
 
-    public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-                   StatsLogger statsLogger) {
+    public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
         super("BookieJournal-" + conf.getBookiePort());
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
@@ -634,6 +639,11 @@ public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManage
 
         this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
+        if (conf.getJournalDirs().length == 1) {
+            lastMarkFileName = LAST_MARK_DEFAULT_NAME;
+        } else {
+            lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
+        }
         lastLogMark.readLog();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
index 31268cd98..4cbd1dc78 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
@@ -77,8 +77,9 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
                 Map<String, String> output = Maps.newHashMap();
 
                 List<Journal> journals = Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
+                int idx = 0;
                 for (File journalDir : conf.getJournalDirs()) {
-                    journals.add(new Journal(journalDir, conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                    journals.add(new Journal(idx++, journalDir, conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
                       new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))));
                 }
                 for (Journal journal : journals) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
index a83f195d5..f31885c8f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
@@ -19,8 +19,9 @@
 package org.apache.bookkeeper.tools.cli.commands.bookie;
 
 import com.beust.jcommander.Parameters;
-import com.google.common.collect.Lists;
-import java.util.List;
+
+import java.io.File;
+
 import org.apache.bookkeeper.bookie.Journal;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LogMark;
@@ -44,14 +45,10 @@ public void run(ServerConfiguration conf) throws Exception {
         LedgerDirsManager dirsManager = new LedgerDirsManager(
             conf, conf.getJournalDirs(),
             new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        List<Journal> journals = Lists.transform(
-            Lists.newArrayList(conf.getJournalDirs()),
-            dir -> new Journal(
-                dir,
-                conf,
-                dirsManager)
-        );
-        for (Journal journal : journals) {
+        File[] journalDirs = conf.getJournalDirs();
+
+        for (int idx = 0; idx < journalDirs.length; idx++) {
+            Journal journal = new Journal(idx, journalDirs[idx], conf, dirsManager);
             LogMark lastLogMark = journal.getLastLogMark().getCurMark();
             System.out.println("LastLogMark : Journal Id - " + lastLogMark.getLogFileId() + "("
                 + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index a55bef77a..0b022fedc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -84,7 +84,7 @@ public void testAckAfterSync() throws Exception {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -146,7 +146,7 @@ public void testAckBeforeSync() throws Exception {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -201,7 +201,7 @@ public void testAckBeforeSyncWithJournalBufferedEntriesThreshold() throws Except
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -260,7 +260,7 @@ public void testInterleavedRequests() throws Exception {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
         journal.start();
 
         final int numEntries = 100;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
new file mode 100644
index 000000000..6c0682ba4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test the bookie with multiple journals.
+ */
+public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase {
+
+    public BookieMultipleJournalsTest() {
+        super(1);
+    }
+
+    protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir,
+            File[] ledgerDirs) {
+        ServerConfiguration conf = super.newServerConfiguration(port, zkServers, journalDir, ledgerDirs);
+
+        // Use 4 journals
+        String[] journalDirs = new String[4];
+        for (int i = 0; i < 4; i++) {
+            journalDirs[i] = journalDir.getAbsolutePath() + "/journal-" + i;
+        }
+        conf.setJournalDirsName(journalDirs);
+
+        return conf;
+    }
+
+    @Test
+    public void testMultipleWritesAndBookieRestart() throws Exception {
+        // Creates few ledgers so that writes are spread across all journals
+        final int numLedgers = 16;
+        final int numEntriesPerLedger = 30;
+        List<LedgerHandle> writeHandles = new ArrayList<>();
+
+        for (int i = 0; i < numLedgers; i++) {
+            writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]));
+        }
+
+        for (int i = 0; i < numEntriesPerLedger; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+            }
+        }
+
+        restartBookies();
+
+        // Write another set of entries
+        for (int i = numEntriesPerLedger; i < 2 * numEntriesPerLedger; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+            }
+        }
+
+        restartBookies();
+
+        List<LedgerHandle> readHandles = new ArrayList<>();
+
+        for (int i = 0; i < numLedgers; i++) {
+            readHandles.add(bkc.openLedger(writeHandles.get(i).getId(), DigestType.CRC32, new byte[0]));
+        }
+
+        for (int i = 0; i < numLedgers; i++) {
+            Enumeration<LedgerEntry> entries = readHandles.get(i).readEntries(0, numEntriesPerLedger - 1);
+
+            for (int j = 0; j < numEntriesPerLedger; j++) {
+                LedgerEntry entry = entries.nextElement();
+                assertEquals("entry-" + j, new String(entry.getEntry()));
+            }
+        }
+    }
+
+}
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
index c5c366a8e..20899b710 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.tools.cli.commands.bookie;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -77,10 +78,12 @@ public void setup() throws Exception {
         when(journal.getLastLogMark()).thenReturn(lastLogMark);
         PowerMockito.whenNew(Journal.class)
             .withParameterTypes(
+                int.class,
                 File.class,
                 ServerConfiguration.class,
                 LedgerDirsManager.class)
             .withArguments(
+                any(int.class),
                 any(File.class),
                 eq(conf),
                 any(LedgerDirsManager.class))
@@ -96,7 +99,7 @@ public void testCommand() throws Exception {
         PowerMockito.verifyNew(LedgerDirsManager.class, times(1))
             .withArguments(eq(conf), any(File[].class), any(DiskChecker.class));
         PowerMockito.verifyNew(Journal.class, times(3))
-            .withArguments(any(File.class), eq(conf), any(LedgerDirsManager.class));
+            .withArguments(any(int.class), any(File.class), eq(conf), any(LedgerDirsManager.class));
         verify(journal, times(3)).getLastLogMark();
         verify(lastLogMark, times(3)).getCurMark();
         verify(logMark, times(3 * 2)).getLogFileId();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services