You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/31 19:29:13 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #10755: [ML] Fix ByteBuf leaks when execution fails

eolivelli commented on a change in pull request #10755:
URL: https://github.com/apache/pulsar/pull/10755#discussion_r642649741



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();

Review comment:
       it looks like now we are calling `recycle` before `ReferenceCountUtil.release(data);`
   may it be a problem ?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();
+                }
+            } finally {
                 ReferenceCountUtil.release(data);
             }
         }
     }
 
     @Override
     public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-        checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
-                lh.getId());
-
-        if (rc == BKException.Code.OK) {
-            log.debug("Successfully closed ledger {}", lh.getId());
-        } else {
-            log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
-        }
+        ByteBuf data = this.data;
+        try {
+            checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",
+                    ledger.getId(),
+                    lh.getId());
+
+            if (rc == BKException.Code.OK) {
+                log.debug("Successfully closed ledger {}", lh.getId());
+            } else {
+                log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
+            }
 
-        ml.ledgerClosed(lh);
-        updateLatency();
+            ml.ledgerClosed(lh);
+            updateLatency();
 
-        AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-        if (cb != null) {
-            cb.addComplete(PositionImpl.get(lh.getId(), entryId), data.asReadOnly(), ctx);
-            ml.notifyCursors();
-            ml.notifyWaitingEntryCallBacks();
-            ReferenceCountUtil.release(data);
-            this.recycle();
-        } else {
+            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+            if (cb != null) {
+                cb.addComplete(PositionImpl.get(lh.getId(), entryId), data.asReadOnly(), ctx);
+                ml.notifyCursors();
+                ml.notifyWaitingEntryCallBacks();
+                this.recycle();

Review comment:
       the same here

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1536,6 +1538,11 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
         }
     }
 
+    synchronized OpAddEntry pollPendingAddEntry() {
+        return pendingAddEntries.poll();

Review comment:
       should we make pendingAddEntries  'private' ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org