You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/04/09 01:15:56 UTC

[GitHub] [bookkeeper] ArvinDevel commented on a diff in pull request #3111: BP-49: Support read ahead async

ArvinDevel commented on code in PR #3111:
URL: https://github.com/apache/bookkeeper/pull/3111#discussion_r846555140


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java:
##########
@@ -527,42 +547,51 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book
 
         dbLedgerStorageStats.getWriteCacheMissCounter().inc();
 
-        // Try reading from read-ahead cache
-        entry = readCache.get(ledgerId, entryId);
-        if (entry != null) {
-            dbLedgerStorageStats.getReadCacheHitCounter().inc();
-            return entry;
-        }
+        // Get entry from storage and trigger read-ahead
+        long readAheadTotalStartNano = MathUtils.nowInNano();
+        if (enableReadAheadAsync) {
+            // Async mode
+            entry = readAheadManager.readEntryOrWait(ledgerId, entryId);
+        } else {
+            // Sync mode
+            // Try reading from read-ahead cache
+            entry = readCache.get(ledgerId, entryId);
+            if (entry != null) {
+                dbLedgerStorageStats.getReadCacheHitCounter().inc();
+                return entry;
+            }
 
-        dbLedgerStorageStats.getReadCacheMissCounter().inc();
+            dbLedgerStorageStats.getReadCacheMissCounter().inc();

Review Comment:
   redundant spaces prefixing the line, please correct the style, same for following lines.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java:
##########
@@ -0,0 +1,655 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}.
+ **/
+public class ReadAheadManager {
+
+    private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class);
+
+    public static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages";
+    public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes";
+    public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio";
+    public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs";
+    public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs";
+
+    // operation behavior indicator
+    public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately";
+    public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize";
+
+    private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
+    private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
+    private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75;
+
+    private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000;
+    private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000;
+
+    private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false;
+    private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;
+
+    private static final class LedgerEntryPosition {

Review Comment:
   is it more suitable if  split the  class ```LedgerEntryPosition``` to separated file, since it seems like one common definition, it maybe could be used in other places.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java:
##########
@@ -527,42 +547,51 @@ private ByteBuf doGetEntry(long ledgerId, long entryId) throws IOException, Book
 
         dbLedgerStorageStats.getWriteCacheMissCounter().inc();
 
-        // Try reading from read-ahead cache
-        entry = readCache.get(ledgerId, entryId);
-        if (entry != null) {
-            dbLedgerStorageStats.getReadCacheHitCounter().inc();
-            return entry;
-        }
+        // Get entry from storage and trigger read-ahead
+        long readAheadTotalStartNano = MathUtils.nowInNano();
+        if (enableReadAheadAsync) {
+            // Async mode
+            entry = readAheadManager.readEntryOrWait(ledgerId, entryId);
+        } else {
+            // Sync mode

Review Comment:
   could you encapsulate the original sync mode implementation into one meaningful method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org