You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/31 04:50:17 UTC

[GitHub] [pulsar] Jason918 commented on a diff in pull request #17241: [managed-ledger] Do not send duplicate reads to BK/offloaders

Jason918 commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r959149660


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry && endEntry <= other.endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (other.startEntry <= startEntry && startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, null);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                    }

Review Comment:
   we can `break` the iteration if overlaps



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -295,51 +299,61 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
+            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+                    shouldCacheEntry, callback, ctx);
 
-                        try {
-                            // We got the entries, we need to transform them to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
+        }
+    }
+
+    CompletableFuture readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) {

Review Comment:
   ```suggestion
       CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) {
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry && endEntry <= other.endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (other.startEntry <= startEntry && startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, null);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                    }
+                }
+            }
+
+            if (foundButMissingSomethingOnLeft != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnLeft.missingOnLeft.size();
+                COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnLeft;
+            } else if (foundButMissingSomethingOnRight != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnRight.missingOnRight.size();
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnRight;
+            } else if (foundButMissingSomethingOnBoth != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnBoth.missingOnRight.size()
+                        - foundButMissingSomethingOnBoth.missingOnLeft.size();
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta);
+                return foundButMissingSomethingOnBoth;
+            }
+
+            created.set(true);
+            PendingRead newRead = new PendingRead(key, ledgerCache);
+            ledgerCache.put(key, newRead);
+            long delta = key.size();
+            COUNT_PENDING_READS_MISSED.inc(delta);
+            COUNT_ENTRIES_READ_FROM_BK.inc(delta);
+            return new FindPendingReadOutcome(newRead, null, null);
+        }
+    }
+
+    private class PendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, PendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
+        boolean completed = false;
+
+        public PendingRead(PendingReadKey key,
+                           Map<PendingReadKey, PendingRead> ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (PendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (PendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                                && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common case
+                            first.callback.readEntriesComplete((List) entriesToReturn,
+                                    first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
+                                    first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : callbacks) {
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackEndEntry + 1));

Review Comment:
   ```suggestion
                               List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1));
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry && endEntry <= other.endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (other.startEntry <= startEntry && startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, null);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                    }
+                }
+            }
+
+            if (foundButMissingSomethingOnLeft != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnLeft.missingOnLeft.size();
+                COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnLeft;
+            } else if (foundButMissingSomethingOnRight != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnRight.missingOnRight.size();
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnRight;
+            } else if (foundButMissingSomethingOnBoth != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnBoth.missingOnRight.size()
+                        - foundButMissingSomethingOnBoth.missingOnLeft.size();
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta);
+                return foundButMissingSomethingOnBoth;
+            }
+
+            created.set(true);
+            PendingRead newRead = new PendingRead(key, ledgerCache);
+            ledgerCache.put(key, newRead);
+            long delta = key.size();
+            COUNT_PENDING_READS_MISSED.inc(delta);
+            COUNT_ENTRIES_READ_FROM_BK.inc(delta);
+            return new FindPendingReadOutcome(newRead, null, null);
+        }
+    }
+
+    private class PendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, PendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
+        boolean completed = false;
+
+        public PendingRead(PendingReadKey key,
+                           Map<PendingReadKey, PendingRead> ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (PendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (PendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                                && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common case
+                            first.callback.readEntriesComplete((List) entriesToReturn,
+                                    first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
+                                    first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : callbacks) {
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackEndEntry + 1));
+                            for (EntryImpl entry : entriesToReturn) {
+                                long entryId = entry.getEntryId();
+                                if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) {
+                                    EntryImpl entryCopy = EntryImpl.create(entry);
+                                    copy.add(entryCopy);
+                                }
+                            }
+                            callback.callback.readEntriesComplete((List) copy, callback.ctx);
+                        }
+                        for (EntryImpl entry : entriesToReturn) {
+                            entry.release();
+                        }
+                    }
+                }
+            }, rangeEntryCache.ml.getExecutor()
+                    .chooseThread(rangeEntryCache.ml.getName())).exceptionally(exception -> {
+                synchronized (PendingRead.this) {
+                    for (ReadEntriesCallbackWithContext callback : callbacks) {
+                        ManagedLedgerException mlException = createManagedLedgerException(exception);
+                        callback.callback.readEntriesFailed(mlException, callback.ctx);
+                    }
+                }
+                return null;
+            });
+        }
+
+        synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
+                                         Object ctx, long startEntry, long endEntry) {
+            if (completed) {
+                return false;
+            }
+            callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
+            return true;
+        }
+    }
+
+
+    void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+                     final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+
+
+        final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
+
+        Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+                cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new ConcurrentHashMap<>());

Review Comment:
   The size of `cachedPendingReads` seem only grow. Maybe we need a callback here to delete it when the ledger is deleted, or just change the type to `LoadingCache`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org