You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 03:29:18 UTC

[pulsar] branch branch-2.9 updated: [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3a5c1d1ac9c [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)
3a5c1d1ac9c is described below

commit 3a5c1d1ac9c23832073350a61f2f19f0e244fa33
Author: Qiang Huang <qi...@gmail.com>
AuthorDate: Mon Nov 14 11:00:07 2022 +0800

    [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)
    
    This PR is fully based on https://github.com/apache/pulsar/pull/17394.
    
    - Remove duplicate release in `ManagedLedgerInterceptor`
    
    (cherry picked from commit e6364bc41f0588586b62af4ed433e39b5c25d7ec)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 -
 .../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 280d75f51e7..e2ef3fe5262 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
@@ -811,7 +810,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         } catch (Exception e) {
             addOperation.failed(
                     new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
-            ReferenceCountUtil.release(addOperation.data);
             log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
             return false;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 60067a7016c..830e1894832 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest  extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testBeforeAddEntryWithException() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+        ManagedLedgerInterceptor interceptor =
+                new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setManagedLedgerInterceptor(interceptor);
+
+        ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    countDownLatch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    countDownLatch.countDown();
+                }
+            }, null);
+            countDownLatch.await();
+            assertEquals(buffer.refCnt(), 1);
+        } finally {
+            ledger.close();
+            factory.shutdown();
+        }
+    }
+
+    private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl {
+        private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
+
+        public MockManagedLedgerInterceptorImpl(
+                Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors,
+                Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) {
+            super(brokerEntryMetadataInterceptors, brokerEntryPayloadProcessors);
+            this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+        }
+
+        @Override
+        public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+            if (op == null || numberOfMessages <= 0) {
+                return op;
+            }
+            op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors,
+                    numberOfMessages));
+            if (op != null) {
+                throw new RuntimeException("throw exception before add entry for test");
+            }
+            return op;
+        }
+    }
+
     public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
         Set<String> interceptorNames = new HashSet<>();
         interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");