You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/01 16:37:43 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #10755: [ML] Fix ByteBuf leaks when execution fails

merlimat commented on a change in pull request #10755:
URL: https://github.com/apache/pulsar/pull/10755#discussion_r643271542



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -178,8 +178,21 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
     @Override
     public void safeRun() {
         // Remove this entry from the head of the pending queue
-        OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
-        checkArgument(this == firstInQueue);

Review comment:
       I'd leave this change out of this PR. This is a "logic" check. If this fails it means we really have a code bug issue and we cannot make many assumptions on how to recover from it.

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {

Review comment:
       The change here LGTM, although I'd prefer to get it in a separate PR. Also, do you have a stack trace on why the exception would have been triggered here?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1504,6 +1504,8 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
                 if (existsOp.ledger != null) {
                     existsOp.close();
                     existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+                    // release the extra retain
+                    ReferenceCountUtil.release(existsOp.data);

Review comment:
       👍  I think this was introduced in #5942. 

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();
+                }
+            } finally {
                 ReferenceCountUtil.release(data);
             }
         }
     }
 
     @Override
     public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-        checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
-                lh.getId());
-
-        if (rc == BKException.Code.OK) {
-            log.debug("Successfully closed ledger {}", lh.getId());
-        } else {
-            log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
-        }
+        ByteBuf data = this.data;
+        try {
+            checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s",

Review comment:
       Same comment for the other assertion above. This is really a code-sanity check. If this fails, it means things are **very** broken. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org