You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/06/28 11:23:12 UTC
[bookkeeper] branch master updated: Basic function: directories to store index files (#3352)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 27b6cb9c96 Basic function: directories to store index files (#3352)
27b6cb9c96 is described below
commit 27b6cb9c9601cfd0ce3f498b2f107aadad231e9e
Author: StevenLuMT <42...@users.noreply.github.com>
AuthorDate: Tue Jun 28 19:23:06 2022 +0800
Basic function: directories to store index files (#3352)
Co-authored-by: lushiji <lu...@didiglobal.com>
---
.../bookie/storage/ldb/DbLedgerStorage.java | 26 ++-
.../ldb/SingleDirectoryDbLedgerStorage.java | 15 +-
.../storage/ldb/DbLedgerStorageIndexDirTest.java | 255 +++++++++++++++++++++
3 files changed, 287 insertions(+), 9 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 7d33e5eb5e..26b2e2c395 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -140,6 +140,10 @@ public class DbLedgerStorage implements LedgerStorage {
throw new IOException("Read and write cache sizes exceed the configured max direct memory size");
}
+ if (ledgerDirsManager.getAllLedgerDirs().size() != indexDirsManager.getAllLedgerDirs().size()) {
+ throw new IOException("ledger and index dirs size not matched");
+ }
+
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
int readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
@@ -147,16 +151,28 @@ public class DbLedgerStorage implements LedgerStorage {
gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollector"));
ledgerStorageList = Lists.newArrayList();
- for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+ for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
+ File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
+ File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
// Create a ledger dirs manager for the single directory
- File[] dirs = new File[1];
+ File[] lDirs = new File[1];
+ // Remove the `/current` suffix which will be appended again by LedgersDirManager
+ lDirs[0] = ledgerDir.getParentFile();
+ LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(), statsLogger);
+
+ // Create a index dirs manager for the single directory
+ File[] iDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
- dirs[0] = ledgerDir.getParentFile();
- LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, ledgerDirsManager.getDiskChecker(), statsLogger);
- ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, indexDirsManager,
+ iDirs[0] = indexDir.getParentFile();
+ LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), statsLogger);
+
+ ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, idm,
statsLogger, gcExecutor, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize, readAheadCacheBatchSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
+ if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
+ idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
+ }
}
// parent DbLedgerStorage stats (not per directory)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index e496c84a14..303f131406 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -82,6 +82,7 @@ import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,8 +153,14 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
- String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
- log.info("Creating single directory db ledger storage on {}", baseDir);
+ String indexBaseDir = indexDirsManager.getAllLedgerDirs().get(0).toString();
+ if (StringUtils.isBlank(indexBaseDir)) {
+ indexBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+ log.info("indexDir is not specified, use default, creating single directory db ledger storage on {}",
+ indexBaseDir);
+ } else {
+ log.info("indexDir is specified, creating single directory db ledger storage on {}", indexBaseDir);
+ }
StatsLogger ledgerDirStatsLogger = statsLogger.scopeLabel("ledgerDir",
ledgerDirsManager.getAllLedgerDirs().get(0).getPath());
@@ -174,9 +181,9 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
readCache = new ReadCache(allocator, readCacheMaxSize);
- ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger);
+ ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, indexBaseDir, ledgerDirStatsLogger);
entryLocationIndex = new EntryLocationIndex(conf,
- KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger);
+ KeyValueStorageRocksDB.factory, indexBaseDir, ledgerDirStatsLogger);
transientLedgerInfoCache = ConcurrentLongHashMap.<TransientLedgerInfo>newBuilder()
.expectedItems(16 * 1024)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageIndexDirTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageIndexDirTest.java
new file mode 100644
index 0000000000..77c4f038b2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageIndexDirTest.java
@@ -0,0 +1,255 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageIndexDirTest {
+
+ private DbLedgerStorage storage;
+ private File tmpLedgerDir;
+ private File tmpIndexDir;
+ private static final String LOCATION_INDEX_SUB_PATH = "locations";
+ private static final String METADATA_INDEX_SUB_PATH = "ledgers";
+
+ @BeforeEach
+ public void setup() throws Exception {
+ tmpLedgerDir = File.createTempFile("ledgerDir", ".dir");
+ tmpLedgerDir.delete();
+ tmpLedgerDir.mkdir();
+ File curLedgerDir = BookieImpl.getCurrentDirectory(tmpLedgerDir);
+ BookieImpl.checkDirectoryStructure(curLedgerDir);
+
+ tmpIndexDir = File.createTempFile("indexDir", ".dir");
+ tmpIndexDir.delete();
+ tmpIndexDir.mkdir();
+ File curIndexDir = BookieImpl.getCurrentDirectory(tmpIndexDir);
+ BookieImpl.checkDirectoryStructure(curIndexDir);
+
+ int gcWaitTime = 1000;
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(gcWaitTime);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+ conf.setProperty(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS, 1000);
+ conf.setLedgerDirNames(new String[]{tmpLedgerDir.toString()});
+ conf.setIndexDirName(new String[]{tmpIndexDir.toString()});
+ Bookie bookie = new TestBookieImpl(conf);
+
+ storage = (DbLedgerStorage) bookie.getLedgerStorage();
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ storage.shutdown();
+ tmpLedgerDir.delete();
+ tmpIndexDir.delete();
+ }
+
+ public boolean hasIndexStructure(File tmpDir) {
+ File indexParentDir = BookieImpl.getCurrentDirectory(tmpDir);
+ String[] indexSubPaths = indexParentDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (LOCATION_INDEX_SUB_PATH.equals(name) || METADATA_INDEX_SUB_PATH.equals(name)) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ if (indexSubPaths.length == 0) {
+ return false;
+ }
+ long hasIndexPathCount = Arrays.stream(indexSubPaths).filter(isp -> {
+ String[] indexFiles = new File(indexParentDir, isp).list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if ("LOCK".equals(name) || "IDENTITY".equals(name) || "CURRENT".equals(name)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ if (indexFiles.length == 3) {
+ return true;
+ }
+ return false;
+ }).count();
+
+ if (hasIndexPathCount == indexSubPaths.length) {
+ return true;
+ }
+ return false;
+ }
+
+ @Test
+ public void checkIndexDirectoryStructure() {
+ assertEquals(false, hasIndexStructure(tmpLedgerDir));
+ assertEquals(true, hasIndexStructure(tmpIndexDir));
+ }
+
+ @Test
+ public void simpleRegressionTest() throws Exception {
+ assertEquals(false, storage.ledgerExists(3));
+ try {
+ storage.isFenced(3);
+ fail("should have failed");
+ } catch (Bookie.NoLedgerException nle) {
+ // OK
+ }
+ assertEquals(false, storage.ledgerExists(3));
+ try {
+ storage.setFenced(3);
+ fail("should have failed");
+ } catch (Bookie.NoLedgerException nle) {
+ // OK
+ }
+ storage.setMasterKey(3, "key".getBytes());
+ try {
+ storage.setMasterKey(3, "other-key".getBytes());
+ fail("should have failed");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getCause() instanceof BookieException.BookieIllegalOpException);
+ }
+ // setting the same key is NOOP
+ storage.setMasterKey(3, "key".getBytes());
+ assertEquals(true, storage.ledgerExists(3));
+ assertEquals(true, storage.setFenced(3));
+ assertEquals(true, storage.isFenced(3));
+ assertEquals(false, storage.setFenced(3));
+
+ storage.setMasterKey(4, "key".getBytes());
+ assertEquals(false, storage.isFenced(4));
+ assertEquals(true, storage.ledgerExists(4));
+
+ assertEquals("key", new String(storage.readMasterKey(4)));
+
+ assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 100)));
+ assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(3, 100)));
+ assertEquals(Lists.newArrayList(3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 4)));
+
+ // Add / read entries
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writeLong(4); // ledger id
+ entry.writeLong(1); // entry id
+ entry.writeLong(0); // lac
+ entry.writeBytes("entry-1".getBytes());
+
+ assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+ assertEquals(1, storage.addEntry(entry));
+
+ assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired());
+
+ // Read from write cache
+ assertTrue(storage.entryExists(4, 1));
+ ByteBuf res = storage.getEntry(4, 1);
+ assertEquals(entry, res);
+
+ storage.flush();
+
+ assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+ // Read from db
+ assertTrue(storage.entryExists(4, 1));
+ res = storage.getEntry(4, 1);
+ assertEquals(entry, res);
+
+ try {
+ storage.getEntry(4, 2);
+ fail("Should have thrown exception");
+ } catch (Bookie.NoEntryException e) {
+ // ok
+ }
+
+ ByteBuf entry2 = Unpooled.buffer(1024);
+ entry2.writeLong(4); // ledger id
+ entry2.writeLong(2); // entry id
+ entry2.writeLong(1); // lac
+ entry2.writeBytes("entry-2".getBytes());
+
+ storage.addEntry(entry2);
+
+ // Read last entry in ledger
+ res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+ assertEquals(entry2, res);
+
+ // Read last add confirmed in ledger
+ assertEquals(1L, storage.getLastAddConfirmed(4));
+
+ ByteBuf entry3 = Unpooled.buffer(1024);
+ entry3.writeLong(4); // ledger id
+ entry3.writeLong(3); // entry id
+ entry3.writeLong(2); // lac
+ entry3.writeBytes("entry-3".getBytes());
+ storage.addEntry(entry3);
+
+ ByteBuf entry4 = Unpooled.buffer(1024);
+ entry4.writeLong(4); // ledger id
+ entry4.writeLong(4); // entry id
+ entry4.writeLong(3); // lac
+ entry4.writeBytes("entry-4".getBytes());
+ storage.addEntry(entry4);
+
+ res = storage.getEntry(4, 4);
+ assertEquals(entry4, res);
+
+ assertEquals(3, storage.getLastAddConfirmed(4));
+
+ // Delete
+ assertEquals(true, storage.ledgerExists(4));
+ storage.deleteLedger(4);
+ assertEquals(false, storage.ledgerExists(4));
+
+ // remove entries for ledger 4 from cache
+ storage.flush();
+
+ try {
+ storage.getEntry(4, 4);
+ fail("Should have thrown exception since the ledger was deleted");
+ } catch (Bookie.NoLedgerException e) {
+ // ok
+ }
+ }
+}