You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 03:29:18 UTC
[pulsar] branch branch-2.9 updated: [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3a5c1d1ac9c [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)
3a5c1d1ac9c is described below
commit 3a5c1d1ac9c23832073350a61f2f19f0e244fa33
Author: Qiang Huang <qi...@gmail.com>
AuthorDate: Mon Nov 14 11:00:07 2022 +0800
[fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17929)
This PR is fully based on https://github.com/apache/pulsar/pull/17394.
- Remove duplicate release in `ManagedLedgerInterceptor`
(cherry picked from commit e6364bc41f0588586b62af4ed433e39b5c25d7ec)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -
.../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 280d75f51e7..e2ef3fe5262 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
@@ -811,7 +810,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
- ReferenceCountUtil.release(addOperation.data);
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
return false;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 60067a7016c..830e1894832 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testBeforeAddEntryWithException() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+ ManagedLedgerInterceptor interceptor =
+ new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+ countDownLatch.await();
+ assertEquals(buffer.refCnt(), 1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+ }
+
+ private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl {
+ private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
+
+ public MockManagedLedgerInterceptorImpl(
+ Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors,
+ Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) {
+ super(brokerEntryMetadataInterceptors, brokerEntryPayloadProcessors);
+ this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+ }
+
+ @Override
+ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+ if (op == null || numberOfMessages <= 0) {
+ return op;
+ }
+ op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors,
+ numberOfMessages));
+ if (op != null) {
+ throw new RuntimeException("throw exception before add entry for test");
+ }
+ return op;
+ }
+ }
+
public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");