You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/13 14:49:06 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #17953: [improve][broker] Issue 17952: Limit the number of pending requests to BookKeeper to save the broker from OODM

eolivelli commented on code in PR #17953:
URL: https://github.com/apache/pulsar/pull/17953#discussion_r994749999


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -437,6 +458,60 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         }
     }
 
+    private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
+                                                                long firstEntry, long lastEntry,
+                                                                boolean shouldCacheEntry,
+                                                                AsyncCallbacks.ReadEntriesCallback originalCallback,
+                                                                Object ctx, PendingReadsLimiter.Handle handle) {
+        if (pendingReadsLimiter.isDisabled()) {
+            return originalCallback;
+        }
+        long estimatedReadSize = (1 + lastEntry - firstEntry)
+                * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        PendingReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle);
+        if (!newHandle.success) {
+            long now = System.currentTimeMillis();
+            if (now - newHandle.creationTime > readEntryTimeoutMillis) {
+                String message = "Time-out elapsed reading from ledger "
+                        + lh.getId()
+                        + ", " + rangeEntryCache.getName()
+                        + ", estimated read size " + estimatedReadSize + " bytes"
+                        + " for " + (1 + lastEntry - firstEntry) + " entries";
+                log.error(message);
+                pendingReadsLimiter.release(newHandle);
+                originalCallback.readEntriesFailed(
+                        new ManagedLedgerException.TooManyRequestsException(message), ctx);
+                return null;
+            }
+            this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), () -> {
+                readEntriesInternal(lh, firstEntry, lastEntry, shouldCacheEntry,
+                        originalCallback, ctx, newHandle);
+                return null;
+            });
+            return null;

Review Comment:
   > Can we make this reactive by queuing cursors that have requested more entries and then feed those cursors as memory gets freed? 
   
   
   I have thought about creating some kind of "async rate limiter", but after all you would have to keep a list (or priority queue) with the reads that are waiting for the needed amount of permits to be available.
   
   Then the problem would be to implement some "fair" algorithm that:
   - does not let the pending reads starve
   - tried to be "fair" and serve the pending reads in FIFO order.
   
   
   With the current mechanism we are scheduling the read on the Thread that is pinned to the ManagerLedger, and this naturally adds some kind of back pressure depending on the demands of reads of the ML.
   
   In one broker all the ML are competing on the available memory and I don't want to keep a single global list of pending reads, as it will be really hard to make it "fair".
   
   With this approach basically each ML will have its own list of waiting pending reads (the internal queue of the pinned executor).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org