You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/05/14 09:08:39 UTC

[GitHub] [bookkeeper] ArvinDevel commented on a diff in pull request #3111: BP-49: Support read ahead async

ArvinDevel commented on code in PR #3111:
URL: https://github.com/apache/bookkeeper/pull/3111#discussion_r872955033


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java:
##########
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}.
+ **/
+public class ReadAheadManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class);
+
+    public static final String READ_AHEAD_MAX_ENTRIES = "dbStorage_readAheadMaxEntries";
+    public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes";
+    public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio";
+    public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs";
+    public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs";
+
+    // operation behavior indicator
+    public static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync";
+    public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
+    public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";
+
+    public static final int DEFAULT_READ_AHEAD_ENTRIES = 1000;
+    public static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
+    public static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75;
+
+    public static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000;
+    public static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 5 * 1000;
+
+    public static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false;
+    public static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
+    public static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;
+
+    private static final class LedgerEntryPosition {
+
+        private long ledgerId;
+        private long entryId;
+
+        public LedgerEntryPosition(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LedgerEntryPosition that = (LedgerEntryPosition) o;
+            return ledgerId == that.ledgerId
+                    && entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerId, entryId);
+        }
+    }
+
+    private static final class ReadAheadPos {
+
+        private final long ledgerId;
+        private final long entryId;
+        private final long location;
+        private final long createTimeMs;
+
+        private long readAheadTaskExpiredTimeMs;
+
+        public ReadAheadPos(long ledgerId, long entryId, long location, long readAheadTaskExpiredTimeMs) {
+
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.location = location;
+
+            this.createTimeMs = System.currentTimeMillis();
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+        }
+
+        public boolean isExpired() {
+            return (System.currentTimeMillis() - createTimeMs) > readAheadTaskExpiredTimeMs;
+        }
+
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        public long getEntryId() {
+            return entryId;
+        }
+
+        public long getLocation() {
+            return location;
+        }
+    }
+
+    private final EntryLogger entryLogger;
+    private final EntryLocationIndex entryLocationIndex;
+    private final ReadCache cache;
+
+    private final DbLedgerStorageStats dbLedgerStorageStats;
+
+    private final boolean enableReadAheadAsync;
+
+    /**
+     * The following parameters apply to both sync and async read-ahead mode.
+     */
+    private int readAheadEntries;
+    private int readAheadBytes;
+
+    /**
+     * The following parameters only apply to async read-ahead mode.
+     */
+    private ExecutorService readAheadExecutor;
+    private ScheduledExecutorService cleanupExecutor;
+
+    private ConcurrentHashMap<LedgerEntryPosition, ReadAheadPos> pendingReadAheadPositions;
+    private ConcurrentLinkedQueue<LedgerEntryPosition> pendingDeletePositions;
+
+    private ConcurrentHashMap<Long, NavigableMap<Long, ReadAheadTaskStatus>> inProgressReadAheadTaskStatuses;
+    private ConcurrentLinkedQueue<ReadAheadTaskStatus> pendingDeleteReadAheadTaskStatuses;
+
+    private boolean submitReadAheadTaskImmediately;
+    private double preTriggerReadAheadRatio;
+
+    private long readAheadTaskExpiredTimeMs;
+    private long readAheadTimeoutMs;
+
+    /**
+     * Entrance for test cases.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, true,
+                DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE,
+                DEFAULT_READ_AHEAD_ENTRIES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO,
+                DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS);
+    }
+
+    /**
+     * Entrance for normal use.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     * @param conf
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats,
+                conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC),
+                conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY),
+                conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE),
+                conf.getInt(READ_AHEAD_MAX_ENTRIES, DEFAULT_READ_AHEAD_ENTRIES),
+                conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES),
+                conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO),
+                conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS),
+                conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS));
+    }
+
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean enableReadAheadAsync,
+                            boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize,
+                            int readAheadEntries, int readAheadBytes, double preTriggerReadAheadRatio,
+                            long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) {
+        // external assistant components assignment
+        this.entryLogger = entryLogger;
+        this.entryLocationIndex = entryLocationIndex;
+        this.cache = cache;
+
+        // metrics
+        this.dbLedgerStorageStats = dbLedgerStorageStats;
+
+        // mode
+        this.enableReadAheadAsync = enableReadAheadAsync;
+
+        // common parameters
+        this.readAheadEntries = readAheadEntries;
+        this.readAheadBytes = readAheadBytes;
+
+        if (enableReadAheadAsync) {
+
+            // configurable arguments
+            this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately;
+            this.preTriggerReadAheadRatio = preTriggerReadAheadRatio;
+
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+            this.readAheadTimeoutMs = readAheadTimeoutMs;
+
+            // core components initialization
+            readAheadExecutor = Executors.newFixedThreadPool(
+                    readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead"));
+            cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
+                    new DefaultThreadFactory("read-ahead-cleanup"));
+
+            pendingReadAheadPositions = new ConcurrentHashMap<>();
+            pendingDeletePositions = new ConcurrentLinkedQueue<>();
+
+            inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>();
+            pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>();
+
+            cleanupExecutor.scheduleAtFixedRate(
+                    SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Shutdown the thread pools used in async read-ahead mode.
+     */
+    public void shutdown() {
+        if (enableReadAheadAsync) {
+            this.readAheadExecutor.shutdown();
+            this.cleanupExecutor.shutdown();
+        }
+    }
+
+    /**
+     * Trigger read-ahead and return the corresponding entry.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    public ByteBuf readEntry(long ledgerId, long entryId) throws IOException {
+        if (enableReadAheadAsync) {
+            return readEntryUnderAsyncReadAhead(ledgerId, entryId);
+        } else {
+            return readEntryUnderSyncReadAhead(ledgerId, entryId);

Review Comment:
   @wuzhanpeng  could you divide this PR into several little PRs?, so that it's more easy to review.
   1)first PR focusing on the reorganize of current sync mode read ahead : such as move `fillReadAheadCache` and related method to the `ReadAheadManager`
   this PR doesn't introduce the new feature, just prepare for the new feature, and this PR should has no impact to the system;
   2)following PR focusing on the functionality, 1 or more PR needed, for example core functionality, monitoring could be split into different PR.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java:
##########
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}.
+ **/
+public class ReadAheadManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class);
+
+    public static final String READ_AHEAD_MAX_ENTRIES = "dbStorage_readAheadMaxEntries";
+    public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes";
+    public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio";
+    public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs";
+    public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs";
+
+    // operation behavior indicator
+    public static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync";
+    public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
+    public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";
+
+    public static final int DEFAULT_READ_AHEAD_ENTRIES = 1000;
+    public static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
+    public static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75;
+
+    public static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000;
+    public static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 5 * 1000;
+
+    public static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false;
+    public static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
+    public static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;
+
+    private static final class LedgerEntryPosition {
+
+        private long ledgerId;
+        private long entryId;
+
+        public LedgerEntryPosition(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LedgerEntryPosition that = (LedgerEntryPosition) o;
+            return ledgerId == that.ledgerId
+                    && entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerId, entryId);
+        }
+    }
+
+    private static final class ReadAheadPos {
+
+        private final long ledgerId;
+        private final long entryId;
+        private final long location;
+        private final long createTimeMs;
+
+        private long readAheadTaskExpiredTimeMs;
+
+        public ReadAheadPos(long ledgerId, long entryId, long location, long readAheadTaskExpiredTimeMs) {
+
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.location = location;
+
+            this.createTimeMs = System.currentTimeMillis();
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+        }
+
+        public boolean isExpired() {
+            return (System.currentTimeMillis() - createTimeMs) > readAheadTaskExpiredTimeMs;
+        }
+
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        public long getEntryId() {
+            return entryId;
+        }
+
+        public long getLocation() {
+            return location;
+        }
+    }
+
+    private final EntryLogger entryLogger;
+    private final EntryLocationIndex entryLocationIndex;
+    private final ReadCache cache;
+
+    private final DbLedgerStorageStats dbLedgerStorageStats;
+
+    private final boolean enableReadAheadAsync;
+
+    /**
+     * The following parameters apply to both sync and async read-ahead mode.
+     */
+    private int readAheadEntries;
+    private int readAheadBytes;
+
+    /**
+     * The following parameters only apply to async read-ahead mode.
+     */
+    private ExecutorService readAheadExecutor;
+    private ScheduledExecutorService cleanupExecutor;
+
+    private ConcurrentHashMap<LedgerEntryPosition, ReadAheadPos> pendingReadAheadPositions;
+    private ConcurrentLinkedQueue<LedgerEntryPosition> pendingDeletePositions;
+
+    private ConcurrentHashMap<Long, NavigableMap<Long, ReadAheadTaskStatus>> inProgressReadAheadTaskStatuses;
+    private ConcurrentLinkedQueue<ReadAheadTaskStatus> pendingDeleteReadAheadTaskStatuses;
+
+    private boolean submitReadAheadTaskImmediately;
+    private double preTriggerReadAheadRatio;
+
+    private long readAheadTaskExpiredTimeMs;
+    private long readAheadTimeoutMs;
+
+    /**
+     * Entrance for test cases.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, true,
+                DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE,
+                DEFAULT_READ_AHEAD_ENTRIES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO,
+                DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS);
+    }
+
+    /**
+     * Entrance for normal use.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     * @param conf
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats,
+                conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC),
+                conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY),
+                conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE),
+                conf.getInt(READ_AHEAD_MAX_ENTRIES, DEFAULT_READ_AHEAD_ENTRIES),
+                conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES),
+                conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO),
+                conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS),
+                conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS));
+    }
+
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean enableReadAheadAsync,
+                            boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize,
+                            int readAheadEntries, int readAheadBytes, double preTriggerReadAheadRatio,
+                            long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) {
+        // external assistant components assignment
+        this.entryLogger = entryLogger;
+        this.entryLocationIndex = entryLocationIndex;
+        this.cache = cache;
+
+        // metrics
+        this.dbLedgerStorageStats = dbLedgerStorageStats;
+
+        // mode
+        this.enableReadAheadAsync = enableReadAheadAsync;
+
+        // common parameters
+        this.readAheadEntries = readAheadEntries;
+        this.readAheadBytes = readAheadBytes;
+
+        if (enableReadAheadAsync) {
+
+            // configurable arguments
+            this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately;
+            this.preTriggerReadAheadRatio = preTriggerReadAheadRatio;
+
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+            this.readAheadTimeoutMs = readAheadTimeoutMs;
+
+            // core components initialization
+            readAheadExecutor = Executors.newFixedThreadPool(
+                    readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead"));
+            cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
+                    new DefaultThreadFactory("read-ahead-cleanup"));
+
+            pendingReadAheadPositions = new ConcurrentHashMap<>();
+            pendingDeletePositions = new ConcurrentLinkedQueue<>();
+
+            inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>();
+            pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>();
+
+            cleanupExecutor.scheduleAtFixedRate(
+                    SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Shutdown the thread pools used in async read-ahead mode.
+     */
+    public void shutdown() {
+        if (enableReadAheadAsync) {
+            this.readAheadExecutor.shutdown();
+            this.cleanupExecutor.shutdown();
+        }
+    }
+
+    /**
+     * Trigger read-ahead and return the corresponding entry.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    public ByteBuf readEntry(long ledgerId, long entryId) throws IOException {
+        if (enableReadAheadAsync) {
+            return readEntryUnderAsyncReadAhead(ledgerId, entryId);
+        } else {
+            return readEntryUnderSyncReadAhead(ledgerId, entryId);
+        }
+    }
+
+    private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private static void recordStatsInNano(Counter counter, long startTimeNanos) {
+        counter.add(MathUtils.elapsedNanos(startTimeNanos));
+    }
+
+    public void addNextReadPosition(long expectedLedgerId, long expectedEntryId,
+                                    long actualStartLedgerId, long actualStartEntryId, long location) {
+        LedgerEntryPosition lep = new LedgerEntryPosition(expectedLedgerId, expectedEntryId);
+        pendingReadAheadPositions.put(lep, new ReadAheadPos(
+                actualStartLedgerId, actualStartEntryId, location, readAheadTaskExpiredTimeMs));
+        pendingDeletePositions.add(lep);
+    }
+
+    protected ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) {
+        NavigableMap<Long, ReadAheadTaskStatus> ledgerReadAheadTaskStatuses =
+                inProgressReadAheadTaskStatuses.get(ledgerId);
+        if (ledgerReadAheadTaskStatuses != null) {
+            Map.Entry<Long, ReadAheadTaskStatus> floorEntry = ledgerReadAheadTaskStatuses.floorEntry(entryId);
+            if (floorEntry != null) {
+                return floorEntry.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Remove those read-ahead tasks which have already exceeded expired time.
+     * NOTE: this method is NOT thread-safe, thus it should be kept in a single thread.
+     */
+    private void removeExpiredReadAheadTasks() {
+        // cleanup read-ahead pos
+        int reclaimedPositions = 0;
+        while (!pendingDeletePositions.isEmpty()) {
+            ReadAheadPos pos = pendingReadAheadPositions.computeIfPresent(
+                    pendingDeletePositions.peek(),
+                    (lep, rap) -> {
+                        if (rap.isExpired()) {
+                            return null;
+                        }
+                        return rap;
+                    });
+            if (pos == null) {
+                pendingDeletePositions.poll();
+                reclaimedPositions++;
+            } else {
+                break;
+            }
+        }
+
+        // cleanup read-ahead task
+        int reclaimedTasks = 0;
+        while (!pendingDeleteReadAheadTaskStatuses.isEmpty()
+                && pendingDeleteReadAheadTaskStatuses.peek().isExpired()) {
+            ReadAheadTaskStatus readAheadTaskStatus = pendingDeleteReadAheadTaskStatuses.poll();
+            reclaimedTasks++;
+            inProgressReadAheadTaskStatuses.computeIfPresent(
+                    readAheadTaskStatus.ledgerId,
+                    (lid, ledgerReadAheadTaskStatuses) -> {
+                       ledgerReadAheadTaskStatuses.remove(readAheadTaskStatus.startEntryId);
+                       return ledgerReadAheadTaskStatuses.isEmpty() ? null : ledgerReadAheadTaskStatuses;
+                    });
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Pending position map reclaimed {} positions, now is {}. "
+                            + "Read-ahead task map reclaimed {} tasks, now is {}",
+                    reclaimedPositions, pendingDeletePositions.size(),
+                    reclaimedTasks, pendingDeleteReadAheadTaskStatuses.size());
+        }
+    }
+
+    /**
+     * This method could be invoked frequently. Please make it short and simple.
+     */
+    public boolean hitInReadAheadPositions(long ledgerId, long entryId) {
+        AtomicBoolean isHit = new AtomicBoolean(false);
+        pendingReadAheadPositions.computeIfPresent(
+                new LedgerEntryPosition(ledgerId, entryId),
+                (lep, rap) -> {
+                    isHit.set(true);
+                    readAheadAsync(rap.getLedgerId(), rap.getEntryId(), rap.getLocation(),
+                            readAheadEntries, readAheadBytes);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Submitted read-ahead task. Info: hit-pos=[L{} E{}] / actual-start-pos=[L{} E{}]",
+                                ledgerId, entryId, rap.getLedgerId(), rap.getEntryId());
+                    }
+                    return null;
+                });
+        return isHit.get();
+    }
+
+    private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) throws IOException {
+        ByteBuf entry;
+        long entryLocation;
+
+        long getLocationIndexStartNanos = MathUtils.nowInNano();
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new Bookie.NoEntryException(ledgerId, entryId);
+            }
+        } catch (Bookie.NoEntryException e) {
+            log.warn("[L{} E{}] Entry not found", ledgerId, entryId);
+            throw e;
+        } finally {
+            recordStatsInNano(dbLedgerStorageStats.getReadFromLocationIndexTime(), getLocationIndexStartNanos);
+        }
+
+        long readEntryStartNanos = MathUtils.nowInNano();
+        try {
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } finally {
+            recordStatsInNano(dbLedgerStorageStats.getReadFromEntryLogTime(), readEntryStartNanos);
+        }
+
+        cache.put(ledgerId, entryId, entry);
+        // init position
+        if (submitReadAheadTaskImmediately) {
+            // submit the read-ahead task immediately
+            readAheadAsync(ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes(),
+                    readAheadEntries, readAheadBytes);
+        } else {
+            // actually execute read-ahead task after hitting this position next time
+            addNextReadPosition(ledgerId, entryId + 1,
+                    ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[L{} E{}] Read {} bytes from local storage, and put L{} E{} to the pending map"
+                            + " or submit task immediately according to submitReadAheadTaskImmediately={}.",
+                    ledgerId, entryId, entry.readableBytes(), ledgerId, entryId + 1, submitReadAheadTaskImmediately);
+        }
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
+        long readAheadStartNano = MathUtils.nowInNano();
+        int count = 0;
+        long size = 0;
+
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+
+            while (count < readAheadEntries
+                    && size < readAheadBytes
+                    && currentEntryLogId == firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation,
+                        false /* validateEntry */);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, stopping read-ahead
+                        break;
+                    }
+
+                    // Insert entry in read cache
+                    cache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    firstEntryId++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
+            }
+        } finally {
+            dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count);
+            dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size);
+            dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano));
+        }
+    }
+
+    /**
+     * Read an entry under sync mode.
+     * This method is moved from {@link SingleDirectoryDbLedgerStorage} to here,
+     * in order to better unify the process of reading an entry.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException {
+        ByteBuf entry;
+        // Try reading from read-ahead cache
+        entry = cache.get(ledgerId, entryId);
+        if (entry != null) {
+            dbLedgerStorageStats.getReadCacheHitCounter().inc();
+            return entry;
+        }
+
+        dbLedgerStorageStats.getReadCacheMissCounter().inc();
+
+        // Read from main storage
+        long entryLocation;
+        long locationIndexStartNano = MathUtils.nowInNano();
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new Bookie.NoEntryException(ledgerId, entryId);
+            }
+        } finally {
+            dbLedgerStorageStats.getReadFromLocationIndexTime().add(MathUtils.elapsedNanos(locationIndexStartNano));
+        }
+
+        long readEntryStartNano = MathUtils.nowInNano();
+        try {
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } finally {
+            dbLedgerStorageStats.getReadFromEntryLogTime().add(MathUtils.elapsedNanos(readEntryStartNano));
+        }
+
+        cache.put(ledgerId, entryId, entry);
+
+        // Try to read more entries
+        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
+        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+        return entry;
+    }
+
+    /**
+     * Read an entry under async mode.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    private ByteBuf readEntryUnderAsyncReadAhead(long ledgerId, long entryId) throws IOException {

Review Comment:
   the method name is a little redundant. just use `readEntryAsync`?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java:
##########
@@ -0,0 +1,806 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}.
+ **/
+public class ReadAheadManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class);
+
+    public static final String READ_AHEAD_MAX_ENTRIES = "dbStorage_readAheadMaxEntries";
+    public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes";
+    public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio";
+    public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs";
+    public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs";
+
+    // operation behavior indicator
+    public static final String ENABLE_READ_AHEAD_ASYNC = "dbStorage_enableReadAheadAsync";
+    public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
+    public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";
+
+    public static final int DEFAULT_READ_AHEAD_ENTRIES = 1000;
+    public static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
+    public static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75;
+
+    public static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000;
+    public static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 5 * 1000;
+
+    public static final boolean DEFAULT_ENABLE_READ_AHEAD_ASYNC = false;
+    public static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
+    public static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;
+
+    private static final class LedgerEntryPosition {
+
+        private long ledgerId;
+        private long entryId;
+
+        public LedgerEntryPosition(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LedgerEntryPosition that = (LedgerEntryPosition) o;
+            return ledgerId == that.ledgerId
+                    && entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerId, entryId);
+        }
+    }
+
+    private static final class ReadAheadPos {
+
+        private final long ledgerId;
+        private final long entryId;
+        private final long location;
+        private final long createTimeMs;
+
+        private long readAheadTaskExpiredTimeMs;
+
+        public ReadAheadPos(long ledgerId, long entryId, long location, long readAheadTaskExpiredTimeMs) {
+
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.location = location;
+
+            this.createTimeMs = System.currentTimeMillis();
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+        }
+
+        public boolean isExpired() {
+            return (System.currentTimeMillis() - createTimeMs) > readAheadTaskExpiredTimeMs;
+        }
+
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        public long getEntryId() {
+            return entryId;
+        }
+
+        public long getLocation() {
+            return location;
+        }
+    }
+
+    private final EntryLogger entryLogger;
+    private final EntryLocationIndex entryLocationIndex;
+    private final ReadCache cache;
+
+    private final DbLedgerStorageStats dbLedgerStorageStats;
+
+    private final boolean enableReadAheadAsync;
+
+    /**
+     * The following parameters apply to both sync and async read-ahead mode.
+     */
+    private int readAheadEntries;
+    private int readAheadBytes;
+
+    /**
+     * The following parameters only apply to async read-ahead mode.
+     */
+    private ExecutorService readAheadExecutor;
+    private ScheduledExecutorService cleanupExecutor;
+
+    private ConcurrentHashMap<LedgerEntryPosition, ReadAheadPos> pendingReadAheadPositions;
+    private ConcurrentLinkedQueue<LedgerEntryPosition> pendingDeletePositions;
+
+    private ConcurrentHashMap<Long, NavigableMap<Long, ReadAheadTaskStatus>> inProgressReadAheadTaskStatuses;
+    private ConcurrentLinkedQueue<ReadAheadTaskStatus> pendingDeleteReadAheadTaskStatuses;
+
+    private boolean submitReadAheadTaskImmediately;
+    private double preTriggerReadAheadRatio;
+
+    private long readAheadTaskExpiredTimeMs;
+    private long readAheadTimeoutMs;
+
+    /**
+     * Entrance for test cases.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, true,
+                DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE,
+                DEFAULT_READ_AHEAD_ENTRIES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO,
+                DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS);
+    }
+
+    /**
+     * Entrance for normal use.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     * @param conf
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats,
+                conf.getBoolean(ENABLE_READ_AHEAD_ASYNC, DEFAULT_ENABLE_READ_AHEAD_ASYNC),
+                conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY),
+                conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE),
+                conf.getInt(READ_AHEAD_MAX_ENTRIES, DEFAULT_READ_AHEAD_ENTRIES),
+                conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES),
+                conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO),
+                conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS),
+                conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS));
+    }
+
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, boolean enableReadAheadAsync,
+                            boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize,
+                            int readAheadEntries, int readAheadBytes, double preTriggerReadAheadRatio,
+                            long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) {
+        // external assistant components assignment
+        this.entryLogger = entryLogger;
+        this.entryLocationIndex = entryLocationIndex;
+        this.cache = cache;
+
+        // metrics
+        this.dbLedgerStorageStats = dbLedgerStorageStats;
+
+        // mode
+        this.enableReadAheadAsync = enableReadAheadAsync;
+
+        // common parameters
+        this.readAheadEntries = readAheadEntries;
+        this.readAheadBytes = readAheadBytes;
+
+        if (enableReadAheadAsync) {
+
+            // configurable arguments
+            this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately;
+            this.preTriggerReadAheadRatio = preTriggerReadAheadRatio;
+
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+            this.readAheadTimeoutMs = readAheadTimeoutMs;
+
+            // core components initialization
+            readAheadExecutor = Executors.newFixedThreadPool(
+                    readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead"));
+            cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
+                    new DefaultThreadFactory("read-ahead-cleanup"));
+
+            pendingReadAheadPositions = new ConcurrentHashMap<>();
+            pendingDeletePositions = new ConcurrentLinkedQueue<>();
+
+            inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>();
+            pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>();
+
+            cleanupExecutor.scheduleAtFixedRate(
+                    SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Shutdown the thread pools used in async read-ahead mode.
+     */
+    public void shutdown() {
+        if (enableReadAheadAsync) {
+            this.readAheadExecutor.shutdown();
+            this.cleanupExecutor.shutdown();
+        }
+    }
+
+    /**
+     * Trigger read-ahead and return the corresponding entry.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    public ByteBuf readEntry(long ledgerId, long entryId) throws IOException {
+        if (enableReadAheadAsync) {
+            return readEntryUnderAsyncReadAhead(ledgerId, entryId);
+        } else {
+            return readEntryUnderSyncReadAhead(ledgerId, entryId);
+        }
+    }
+
+    private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private static void recordStatsInNano(Counter counter, long startTimeNanos) {
+        counter.add(MathUtils.elapsedNanos(startTimeNanos));
+    }
+
+    public void addNextReadPosition(long expectedLedgerId, long expectedEntryId,
+                                    long actualStartLedgerId, long actualStartEntryId, long location) {
+        LedgerEntryPosition lep = new LedgerEntryPosition(expectedLedgerId, expectedEntryId);
+        pendingReadAheadPositions.put(lep, new ReadAheadPos(
+                actualStartLedgerId, actualStartEntryId, location, readAheadTaskExpiredTimeMs));
+        pendingDeletePositions.add(lep);
+    }
+
+    protected ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) {
+        NavigableMap<Long, ReadAheadTaskStatus> ledgerReadAheadTaskStatuses =
+                inProgressReadAheadTaskStatuses.get(ledgerId);
+        if (ledgerReadAheadTaskStatuses != null) {
+            Map.Entry<Long, ReadAheadTaskStatus> floorEntry = ledgerReadAheadTaskStatuses.floorEntry(entryId);
+            if (floorEntry != null) {
+                return floorEntry.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Remove those read-ahead tasks which have already exceeded expired time.
+     * NOTE: this method is NOT thread-safe, thus it should be kept in a single thread.
+     */
+    private void removeExpiredReadAheadTasks() {
+        // cleanup read-ahead pos
+        int reclaimedPositions = 0;
+        while (!pendingDeletePositions.isEmpty()) {
+            ReadAheadPos pos = pendingReadAheadPositions.computeIfPresent(
+                    pendingDeletePositions.peek(),
+                    (lep, rap) -> {
+                        if (rap.isExpired()) {
+                            return null;
+                        }
+                        return rap;
+                    });
+            if (pos == null) {
+                pendingDeletePositions.poll();
+                reclaimedPositions++;
+            } else {
+                break;
+            }
+        }
+
+        // cleanup read-ahead task
+        int reclaimedTasks = 0;
+        while (!pendingDeleteReadAheadTaskStatuses.isEmpty()
+                && pendingDeleteReadAheadTaskStatuses.peek().isExpired()) {
+            ReadAheadTaskStatus readAheadTaskStatus = pendingDeleteReadAheadTaskStatuses.poll();
+            reclaimedTasks++;
+            inProgressReadAheadTaskStatuses.computeIfPresent(
+                    readAheadTaskStatus.ledgerId,
+                    (lid, ledgerReadAheadTaskStatuses) -> {
+                       ledgerReadAheadTaskStatuses.remove(readAheadTaskStatus.startEntryId);
+                       return ledgerReadAheadTaskStatuses.isEmpty() ? null : ledgerReadAheadTaskStatuses;
+                    });
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Pending position map reclaimed {} positions, now is {}. "
+                            + "Read-ahead task map reclaimed {} tasks, now is {}",
+                    reclaimedPositions, pendingDeletePositions.size(),
+                    reclaimedTasks, pendingDeleteReadAheadTaskStatuses.size());
+        }
+    }
+
+    /**
+     * This method could be invoked frequently. Please make it short and simple.
+     */
+    public boolean hitInReadAheadPositions(long ledgerId, long entryId) {
+        AtomicBoolean isHit = new AtomicBoolean(false);
+        pendingReadAheadPositions.computeIfPresent(
+                new LedgerEntryPosition(ledgerId, entryId),
+                (lep, rap) -> {
+                    isHit.set(true);
+                    readAheadAsync(rap.getLedgerId(), rap.getEntryId(), rap.getLocation(),
+                            readAheadEntries, readAheadBytes);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Submitted read-ahead task. Info: hit-pos=[L{} E{}] / actual-start-pos=[L{} E{}]",
+                                ledgerId, entryId, rap.getLedgerId(), rap.getEntryId());
+                    }
+                    return null;
+                });
+        return isHit.get();
+    }
+
+    private ByteBuf readAndAddNextReadAheadPosition(long ledgerId, long entryId) throws IOException {
+        ByteBuf entry;
+        long entryLocation;
+
+        long getLocationIndexStartNanos = MathUtils.nowInNano();
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new Bookie.NoEntryException(ledgerId, entryId);
+            }
+        } catch (Bookie.NoEntryException e) {
+            log.warn("[L{} E{}] Entry not found", ledgerId, entryId);
+            throw e;
+        } finally {
+            recordStatsInNano(dbLedgerStorageStats.getReadFromLocationIndexTime(), getLocationIndexStartNanos);
+        }
+
+        long readEntryStartNanos = MathUtils.nowInNano();
+        try {
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } finally {
+            recordStatsInNano(dbLedgerStorageStats.getReadFromEntryLogTime(), readEntryStartNanos);
+        }
+
+        cache.put(ledgerId, entryId, entry);
+        // init position
+        if (submitReadAheadTaskImmediately) {
+            // submit the read-ahead task immediately
+            readAheadAsync(ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes(),
+                    readAheadEntries, readAheadBytes);
+        } else {
+            // actually execute read-ahead task after hitting this position next time
+            addNextReadPosition(ledgerId, entryId + 1,
+                    ledgerId, entryId + 1, entryLocation + 4 + entry.readableBytes());
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[L{} E{}] Read {} bytes from local storage, and put L{} E{} to the pending map"
+                            + " or submit task immediately according to submitReadAheadTaskImmediately={}.",
+                    ledgerId, entryId, entry.readableBytes(), ledgerId, entryId + 1, submitReadAheadTaskImmediately);
+        }
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
+        long readAheadStartNano = MathUtils.nowInNano();
+        int count = 0;
+        long size = 0;
+
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+
+            while (count < readAheadEntries
+                    && size < readAheadBytes
+                    && currentEntryLogId == firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation,
+                        false /* validateEntry */);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, stopping read-ahead
+                        break;
+                    }
+
+                    // Insert entry in read cache
+                    cache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    firstEntryId++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
+            }
+        } finally {
+            dbLedgerStorageStats.getReadAheadBatchCountCounter().add(count);
+            dbLedgerStorageStats.getReadAheadBatchSizeCounter().add(size);
+            dbLedgerStorageStats.getReadAheadTime().add(MathUtils.elapsedNanos(readAheadStartNano));
+        }
+    }
+
+    /**
+     * Read an entry under sync mode.
+     * This method is moved from {@link SingleDirectoryDbLedgerStorage} to here,
+     * in order to better unify the process of reading an entry.
+     *
+     * @param ledgerId
+     * @param entryId
+     * @return
+     * @throws IOException
+     */
+    private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException {

Review Comment:
   just use `readEntrySync` or similar name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org