You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/26 01:26:46 UTC
[bookkeeper] 01/02: Fix data lost when configured multiple ledger directories (#3329)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 14dbfd2d787e73c1b2c2469841c2b98e1370f2da
Author: Hang Chen <ch...@apache.org>
AuthorDate: Sun Jun 25 23:58:57 2023 +0800
Fix data lost when configured multiple ledger directories (#3329)
(cherry picked from commit 8a76703ee44b1f5af9eaedd68a53368dbf5855f0)
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 5 +
.../java/org/apache/bookkeeper/bookie/Journal.java | 2 +-
.../ldb/SingleDirectoryDbLedgerStorage.java | 6 +-
.../bookie/storage/ldb/DbLedgerStorageTest.java | 183 +++++++++++++++++++++
4 files changed, 194 insertions(+), 2 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 0628ec28af..e5520185f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -1285,4 +1285,9 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
}
}
}
+
+ @VisibleForTesting
+ public List<Journal> getJournals() {
+ return this.journals;
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d8df972878..9a056ca67b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -226,7 +226,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
* The last mark should first be max journal log id,
* and then max log position in max journal log.
*/
- void readLog() {
+ public void readLog() {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index a140db7010..c0fff1ec46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -142,6 +142,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
private final long maxReadAheadBytesSize;
private final Counter flushExecutorTime;
+ private final boolean singleLedgerDirs;
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
@@ -172,6 +173,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
this.writeCacheMaxSize = writeCacheSize;
this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
+ this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
readCacheMaxSize = readCacheSize;
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
@@ -895,7 +897,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
public void flush() throws IOException {
Checkpoint cp = checkpointSource.newCheckpoint();
checkpoint(cp);
- checkpointSource.checkpointComplete(cp, true);
+ if (singleLedgerDirs) {
+ checkpointSource.checkpointComplete(cp, true);
+ }
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 2a7e8e2869..65f11e5d6a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -31,16 +31,21 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LogMark;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -639,4 +644,182 @@ public class DbLedgerStorageTest {
storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage();
}
+
+ @Test
+ public void testMultiLedgerDirectoryCheckpoint() throws Exception {
+ int gcWaitTime = 1000;
+ File firstDir = new File(tmpDir, "dir1");
+ File secondDir = new File(tmpDir, "dir2");
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });
+
+ BookieImpl bookie = new TestBookieImpl(conf);
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(2); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+
+ bookie.getLedgerStorage().addEntry(entry1);
+ // write one entry to first ledger directory and flush with logMark(1, 2),
+ // only the first ledger directory should have lastMark
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+ ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+ File firstDirMark = new File(firstDir + "/current", "lastMark");
+ File secondDirMark = new File(secondDir + "/current", "lastMark");
+
+ // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
+ try {
+ readLogMark(firstDirMark);
+ readLogMark(secondDirMark);
+ fail();
+ } catch (Exception e) {
+ //
+ }
+
+ // write the second entry to second leger directory and flush with log(4, 5),
+ // the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(2); // ledger id
+ entry2.writeLong(1); // entry id
+ entry2.writeBytes("entry-2".getBytes());
+
+ bookie.getLedgerStorage().addEntry(entry2);
+ // write one entry to first ledger directory and flush with logMark(1, 2),
+ // only the first ledger directory should have lastMark
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+ ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
+
+ // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
+ try {
+ readLogMark(firstDirMark);
+ readLogMark(secondDirMark);
+ fail();
+ } catch (Exception e) {
+ //
+ }
+
+ // The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
+ bookie.getLedgerStorage().flush();
+ try {
+ readLogMark(firstDirMark);
+ readLogMark(secondDirMark);
+ fail();
+ } catch (Exception e) {
+ //
+ }
+
+ // trigger checkpoint simulate SyncThread do checkpoint.
+ CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+ CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
+ checkpointSource.checkpointComplete(checkpoint, false);
+
+ try {
+ LogMark firstLogMark = readLogMark(firstDirMark);
+ LogMark secondLogMark = readLogMark(secondDirMark);
+ assertEquals(7, firstLogMark.getLogFileId());
+ assertEquals(8, firstLogMark.getLogFileOffset());
+ assertEquals(7, secondLogMark.getLogFileId());
+ assertEquals(8, secondLogMark.getLogFileOffset());
+ } catch (Exception e) {
+ fail();
+ }
+
+ // test replay journal lastMark, to make sure we get the right LastMark position
+ bookie.getJournals().get(0).getLastLogMark().readLog();
+ LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
+ assertEquals(7, logMark.getLogFileId());
+ assertEquals(8, logMark.getLogFileOffset());
+ }
+
+ private LogMark readLogMark(File file) throws IOException {
+ byte[] buff = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ LogMark mark = new LogMark();
+ try (FileInputStream fis = new FileInputStream(file)) {
+ int bytesRead = fis.read(buff);
+ if (bytesRead != 16) {
+ throw new IOException("Couldn't read enough bytes from lastMark."
+ + " Wanted " + 16 + ", got " + bytesRead);
+ }
+ }
+ bb.clear();
+ mark.readLogMark(bb);
+
+ return mark;
+ }
+
+ @Test
+ public void testSingleLedgerDirectoryCheckpoint() throws Exception {
+ int gcWaitTime = 1000;
+ File ledgerDir = new File(tmpDir, "dir");
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+
+ BookieImpl bookie = new TestBookieImpl(conf);
+ ByteBuf entry1 = Unpooled.buffer(1024);
+ entry1.writeLong(1); // ledger id
+ entry1.writeLong(2); // entry id
+ entry1.writeBytes("entry-1".getBytes());
+ bookie.getLedgerStorage().addEntry(entry1);
+
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+ ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+ File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
+ try {
+ LogMark logMark = readLogMark(ledgerDirMark);
+ assertEquals(1, logMark.getLogFileId());
+ assertEquals(2, logMark.getLogFileOffset());
+ } catch (Exception e) {
+ fail();
+ }
+
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(2); // ledger id
+ entry2.writeLong(1); // entry id
+ entry2.writeBytes("entry-2".getBytes());
+
+ bookie.getLedgerStorage().addEntry(entry2);
+ // write one entry to first ledger directory and flush with logMark(1, 2),
+ // only the first ledger directory should have lastMark
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+
+ bookie.getLedgerStorage().flush();
+ try {
+ LogMark logMark = readLogMark(ledgerDirMark);
+ assertEquals(4, logMark.getLogFileId());
+ assertEquals(5, logMark.getLogFileOffset());
+ } catch (Exception e) {
+ fail();
+ }
+
+ CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
+ bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+ CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
+ checkpointSource.checkpointComplete(checkpoint, false);
+
+ try {
+ LogMark firstLogMark = readLogMark(ledgerDirMark);
+ assertEquals(7, firstLogMark.getLogFileId());
+ assertEquals(8, firstLogMark.getLogFileOffset());
+ } catch (Exception e) {
+ fail();
+ }
+
+ // test replay journal lastMark, to make sure we get the right LastMark position
+ bookie.getJournals().get(0).getLastLogMark().readLog();
+ LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
+ assertEquals(7, logMark.getLogFileId());
+ assertEquals(8, logMark.getLogFileOffset());
+ }
}