You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/26 01:26:46 UTC

[bookkeeper] 01/02: Fix data lost when configured multiple ledger directories (#3329)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 14dbfd2d787e73c1b2c2469841c2b98e1370f2da
Author: Hang Chen <ch...@apache.org>
AuthorDate: Sun Jun 25 23:58:57 2023 +0800

    Fix data lost when configured multiple ledger directories (#3329)
    
    (cherry picked from commit 8a76703ee44b1f5af9eaedd68a53368dbf5855f0)
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |   5 +
 .../java/org/apache/bookkeeper/bookie/Journal.java |   2 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   6 +-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    | 183 +++++++++++++++++++++
 4 files changed, 194 insertions(+), 2 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 0628ec28af..e5520185f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -1285,4 +1285,9 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
             }
         }
     }
+
+    @VisibleForTesting
+    public List<Journal> getJournals() {
+        return this.journals;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d8df972878..9a056ca67b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -226,7 +226,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
          * The last mark should first be max journal log id,
          * and then max log position in max journal log.
          */
-        void readLog() {
+        public void readLog() {
             byte[] buff = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index a140db7010..c0fff1ec46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -142,6 +142,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     private final long maxReadAheadBytesSize;
 
     private final Counter flushExecutorTime;
+    private final boolean singleLedgerDirs;
 
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                                           LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
@@ -172,6 +173,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
         this.writeCacheMaxSize = writeCacheSize;
         this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
         this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
+        this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
 
         readCacheMaxSize = readCacheSize;
         this.readAheadCacheBatchSize = readAheadCacheBatchSize;
@@ -895,7 +897,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     public void flush() throws IOException {
         Checkpoint cp = checkpointSource.newCheckpoint();
         checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+        if (singleLedgerDirs) {
+            checkpointSource.checkpointComplete(cp, true);
+        }
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 2a7e8e2869..65f11e5d6a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -31,16 +31,21 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSourceList;
 import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LogMark;
 import org.apache.bookkeeper.bookie.TestBookieImpl;
 import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -639,4 +644,182 @@ public class DbLedgerStorageTest {
 
         storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage();
     }
+
+    @Test
+    public void testMultiLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File firstDir = new File(tmpDir, "dir1");
+        File secondDir = new File(tmpDir, "dir2");
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });
+
+        BookieImpl bookie = new TestBookieImpl(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry1);
+        // write one entry to first ledger directory and flush with logMark(1, 2),
+        // only the first ledger directory should have lastMark
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File firstDirMark = new File(firstDir + "/current", "lastMark");
+        File secondDirMark = new File(secondDir + "/current", "lastMark");
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // write the second entry to second leger directory and flush with log(4, 5),
+        // the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 2),
+        // only the first ledger directory should have lastMark
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+        ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
+        bookie.getLedgerStorage().flush();
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // trigger checkpoint simulate SyncThread do checkpoint.
+        CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(firstDirMark);
+            LogMark secondLogMark = readLogMark(secondDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+            assertEquals(7, secondLogMark.getLogFileId());
+            assertEquals(8, secondLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
+
+    private LogMark readLogMark(File file) throws IOException {
+        byte[] buff = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark mark = new LogMark();
+        try (FileInputStream fis = new FileInputStream(file)) {
+            int bytesRead = fis.read(buff);
+            if (bytesRead != 16) {
+                throw new IOException("Couldn't read enough bytes from lastMark."
+                    + " Wanted " + 16 + ", got " + bytesRead);
+            }
+        }
+        bb.clear();
+        mark.readLogMark(bb);
+
+        return mark;
+    }
+
+    @Test
+    public void testSingleLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File ledgerDir = new File(tmpDir, "dir");
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+
+        BookieImpl bookie = new TestBookieImpl(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+        bookie.getLedgerStorage().addEntry(entry1);
+
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(1, logMark.getLogFileId());
+            assertEquals(2, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 2),
+        // only the first ledger directory should have lastMark
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+
+        bookie.getLedgerStorage().flush();
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(4, logMark.getLogFileId());
+            assertEquals(5, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
+        bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(ledgerDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
 }