You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/30 00:38:25 UTC
[pulsar] 03/04: [fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a9938558a8a09414d9adadc520f4c797c750c8a7
Author: gaozhangmin <ga...@gmail.com>
AuthorDate: Tue Mar 29 01:00:20 2022 +0800
[fix][transaction] Properly close transaction-buffer-sub non durable cursor (#14900)
Fixes #14880
### Motivation
Non durable cursor was not closed properly.
### Modifications
For non durable cursor, `cursor.asyncClose` did nothing. The proper way is `topic.getManagedLedger().asyncDeleteCursor`
(cherry picked from commit 4e62ffc15714cfa49ed441f3ba7ededb866b9062)
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 13 ++++++++-----
.../apache/pulsar/broker/transaction/TransactionTest.java | 7 +++++--
2 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 66ce8f5..e2888d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Codec;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
@@ -639,7 +640,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
}
- closeCursor(managedCursor);
+ closeCursor(SUBSCRIPTION_NAME);
callBack.recoverComplete();
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this)).exceptionally(e -> {
@@ -656,17 +657,19 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
});
}
- private void closeCursor(ManagedCursor cursor) {
- cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+ private void closeCursor(String subscriptionName) {
+ topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName),
+ new AsyncCallbacks.DeleteCursorCallback() {
@Override
- public void closeComplete(Object ctx) {
+ public void deleteCursorComplete(Object ctx) {
log.info("[{}]Transaction buffer snapshot recover cursor close complete.", topic.getName());
}
@Override
- public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}]Transaction buffer snapshot recover cursor close fail.", topic.getName());
}
+
}, null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 231c183..ae5e8c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -538,7 +538,7 @@ public class TransactionTest extends TransactionTestBase {
.getTopic("persistent://" + topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
- ManagedCursor managedCursor = mock(ManagedCursor.class);
+ ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn("transaction-buffer-sub").when(managedCursor).getName();
doReturn(true).when(managedCursor).hasMoreEntries();
doAnswer(invocation -> {
@@ -579,6 +579,9 @@ public class TransactionTest extends TransactionTestBase {
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats().state, "Ready"));
+ persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
+ assertTrue(internalStats.cursors.isEmpty());
+ });
managedCursors.removeCursor("transaction-buffer-sub");
}
@@ -893,4 +896,4 @@ public class TransactionTest extends TransactionTestBase {
pulsarServiceList.forEach((pulsarService ->
pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
}
-}
\ No newline at end of file
+}