You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/01 06:50:20 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #17241: [managed-ledger] Do not send duplicate reads to BK/offloaders

eolivelli commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r960278994


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry && endEntry <= other.endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (other.startEntry <= startEntry && startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                        // we can exit the loop in this case, as below we are not going to
+                        // consider the other options
+                        break;
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),

Review Comment:
   below we have to choose what happens in case we have:
   - foundButMissingSomethingOnLeft
   - foundButMissingSomethingOnRight
   - foundButMissingSomethingOnBoth
   
   Currently we give preference to foundButMissingSomethingOnRight. So we can break the loop when we found it.
   
   We can add in the future something more sophisticated to decide "the best" operation to reuse.
   
   I did a few tests and  "missing on right" is the thing that happens more frequently, both on tailing reads and when I have many subscriptions that are catching up (even if I give preference to the other options).
   This makes sense as the subscriptions are moving forward so when a new read comes and there is already one pending it is common that the new read misses the new entries "on the right" (on the tail of the topic).
   
   After re-reading the code If we break here when we find "anything" we could lose the chance to find the "match include" case, that is the absolutely the best, as we don't need further reads.
   So I am going to remove this "break"
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org