You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/09/08 03:08:58 UTC
[pulsar] branch master updated: [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c6a1875db2f [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)
c6a1875db2f is described below
commit c6a1875db2f7a9e5cab187dc98e90487cf08a805
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 8 11:08:49 2022 +0800
[fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 -
.../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 84a7377841c..892c0e73aea 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -32,7 +32,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
@@ -851,7 +850,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
- ReferenceCountUtil.release(addOperation.data);
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
return false;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index e42db517f8d..37c49b79ed0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testBeforeAddEntryWithException() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+ ManagedLedgerInterceptor interceptor =
+ new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ countDownLatch.countDown();
+ }
+ }, null);
+ countDownLatch.await();
+ assertEquals(buffer.refCnt(), 1);
+ } finally {
+ ledger.close();
+ factory.shutdown();
+ }
+ }
+
+ private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl {
+ private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
+
+ public MockManagedLedgerInterceptorImpl(
+ Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors,
+ Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) {
+ super(brokerEntryMetadataInterceptors, brokerEntryPayloadProcessors);
+ this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+ }
+
+ @Override
+ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+ if (op == null || numberOfMessages <= 0) {
+ return op;
+ }
+ op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors,
+ numberOfMessages));
+ if (op != null) {
+ throw new RuntimeException("throw exception before add entry for test");
+ }
+ return op;
+ }
+ }
+
public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");