You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/23 23:07:23 UTC

[GitHub] sijie commented on a change in pull request #1289: Allow multiple directories in DbLedgerStorage

sijie commented on a change in pull request #1289: Allow multiple directories in DbLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1289#discussion_r176884131
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 ##########
 @@ -20,323 +20,114 @@
  */
 package org.apache.bookkeeper.bookie.storage.ldb;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.SortedMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.StampedLock;
 
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
-import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.StateManager;
-import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
-import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+
 
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given ledger.
-     */
-    private static class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
-
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
-
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
-
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
-
-    // Write cache where all new entries are inserted into
-    protected volatile WriteCache writeCache;
-
-    // Write cache that is used to swap with writeCache during flushes
-    protected volatile WriteCache writeCacheBeingFlushed;
-
-    // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
-
-    private final StampedLock writeCacheRotationLock = new StampedLock();
-
-    protected final ReentrantLock flushMutex = new ReentrantLock();
-
-    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
-    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
-
-    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
-
-    // Executor used to for db index cleanup
-    private final ScheduledExecutorService cleanupExecutor = Executors
-            .newSingleThreadScheduledExecutor(new DefaultThreadFactory("db-storage-cleanup"));
+@Slf4j
+public class DbLedgerStorage implements LedgerStorage {
 
     static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
 
     static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";
 
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
 
     private static final int MB = 1024 * 1024;
 
-    private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
-            .newCopyOnWriteArrayList();
-
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
-    private Checkpoint lastCheckpoint = Checkpoint.MIN;
-
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
-
-    private long maxThrottleTimeNanos;
-
-    private StatsLogger stats;
+    private int numberOfDirs;
+    private List<SingleDirectoryDbLedgerStorage> ledgerStorageList;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
-
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    // Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized
+    private ScheduledExecutorService gcExecutor;
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
             Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
-        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
-                "Db implementation only allows for one storage dir");
-
-        String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
-
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
-
-        this.checkpointSource = checkpointSource;
-
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
-        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
-
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, DEFAUL_MAX_THROTTLE_TIME_MILLIS);
-        maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
-
-        readCache = new ReadCache(readCacheMaxSize);
+        long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+        long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
-        this.stats = statsLogger;
+        this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
+        log.info(" - Number of directories: {}", numberOfDirs);
         log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
         log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
 
-        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
-        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+        long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
+        long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
 
-        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
-                Runtime.getRuntime().availableProcessors() * 2);
-        cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollector"));
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
+        ledgerStorageList = Lists.newArrayList();
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            // Create a ledger dirs manager for the single directory
+            File[] dirs = new File[1];
+            // Remove the `/current` suffix which will be appended again by LedgersDirManager
+            dirs[0] = ledgerDir.getParentFile();
+            LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, ledgerDirsManager.getDiskChecker(), statsLogger);
+            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, indexDirsManager,
 
 Review comment:
   there is a problem here - if you run with pulsar 1.22.0 (with yahoo branch), you upgrade to pulsar 2.0 (with bk 4.7). if you rollback, there is going to be a problem, it will cause data loss.
   
   so you might consider having a defensing solution to prevent this happen or you have to document this well to make sure users are aware of this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services