You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:32 UTC
[pulsar] 09/13: [Transaction] Fix subscription ack transaction marker. (#14170)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ebd5b6e56c21ec7f3b12c15dbdef076a21887e6
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:41:48 2022 +0800
[Transaction] Fix subscription ack transaction marker. (#14170)
(cherry picked from commit 603d252ed6e405d2015d2d0fb73047bb9a96b268)
---
.../broker/service/AbstractBaseDispatcher.java | 4 +
.../service/persistent/PersistentSubscription.java | 83 ----------
.../org/apache/pulsar/broker/BrokerTestUtil.java | 15 ++
.../service/TransactionMarkerDeleteTest.java | 184 ++++++++++-----------
4 files changed, 104 insertions(+), 182 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index f98cfe5..ba757b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -126,6 +126,10 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
+ // because consumer can receive message is smaller than maxReadPosition,
+ // so this marker is useless for this subscription
+ subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
+ Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cd9f462..f69db68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -69,7 +69,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
@@ -77,8 +76,6 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,26 +111,15 @@ public class PersistentSubscription implements Subscription {
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
- private volatile Position lastMarkDeleteForTransactionMarker;
private final PendingAckHandle pendingAckHandle;
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
- private DeleteTransactionMarkerState deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
-
- private final Object waitObject = new Object();
-
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
- public enum DeleteTransactionMarkerState {
- Process,
- Wait,
- None
- }
-
static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}
@@ -419,8 +405,6 @@ public class PersistentSubscription implements Subscription {
}
}
- deleteTransactionMarker(properties);
-
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
if (dispatcher != null) {
@@ -429,73 +413,6 @@ public class PersistentSubscription implements Subscription {
}
}
- private void deleteTransactionMarker(Map<String, Long> properties) {
-
- if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
- PositionImpl currentMarkDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
- if ((lastMarkDeleteForTransactionMarker == null
- || ((PositionImpl) lastMarkDeleteForTransactionMarker)
- .compareTo(currentMarkDeletePosition) < 0)) {
- if (currentMarkDeletePosition != null) {
- ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
- PositionImpl nextPosition = managedLedger.getNextValidPosition(currentMarkDeletePosition);
- if (nextPosition != null
- && nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) {
- synchronized (waitObject) {
- if (deleteTransactionMarkerState == DeleteTransactionMarkerState.None) {
- deleteTransactionMarkerState = DeleteTransactionMarkerState.Process;
- managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- try {
- MessageMetadata messageMetadata =
- Commands.parseMessageMetadata(entry.getDataBuffer());
- if (Markers.isTxnCommitMarker(messageMetadata)
- || Markers.isTxnAbortMarker(messageMetadata)) {
- synchronized (waitObject) {
- deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
- }
- lastMarkDeleteForTransactionMarker = currentMarkDeletePosition;
- acknowledgeMessage(Collections.singletonList(nextPosition),
- AckType.Individual, properties);
- } else {
- synchronized (waitObject) {
- if (deleteTransactionMarkerState
- == DeleteTransactionMarkerState.Wait) {
- deleteTransactionMarkerState =
- DeleteTransactionMarkerState.None;
- deleteTransactionMarker(properties);
- } else {
- deleteTransactionMarkerState =
- DeleteTransactionMarkerState.None;
- }
- }
- }
- } finally {
- entry.release();
- }
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- synchronized (waitObject) {
- deleteTransactionMarkerState =
- DeleteTransactionMarkerState.None;
- }
- log.error("Fail to read transaction marker! Position : {}",
- currentMarkDeletePosition, exception);
- }
- }, null);
- } else if (deleteTransactionMarkerState == DeleteTransactionMarkerState.Process) {
- deleteTransactionMarkerState = DeleteTransactionMarkerState.Wait;
- }
- }
- }
- }
- }
- }
- }
-
public CompletableFuture<Void> transactionIndividualAcknowledge(
TxnID txnId,
List<MutablePair<PositionImpl, Integer>> positions) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index ff03e42..224060c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker;
import java.util.UUID;
+import org.mockito.Mockito;
/**
* Holds util methods used in test.
@@ -29,4 +30,18 @@ public class BrokerTestUtil {
return prefix + "-" + UUID.randomUUID();
}
+ /**
+ * Creates a Mockito spy directly without an intermediate instance to spy.
+ * This is to address flaky test issue where a spy created with a given instance fails with
+ * {@link org.mockito.exceptions.misusing.WrongTypeOfReturnValue} exception.
+ *
+ * @param classToSpy the class to spy
+ * @param args the constructor arguments to use when creating the spy instance
+ * @return a spy of the provided class created with given constructor arguments
+ */
+ public static <T> T spyWithClassAndConstructorArgs(Class<T> classToSpy, Object... args) {
+ return Mockito.mock(classToSpy, Mockito.withSettings()
+ .useConstructor(args)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index f25b346..aa2a8d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -25,45 +26,39 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import static org.testng.Assert.assertNull;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.testng.collections.Sets;
@Test(groups = "broker")
-public class TransactionMarkerDeleteTest extends BrokerTestBase {
+public class TransactionMarkerDeleteTest extends TransactionTestBase {
+ private static final int TOPIC_PARTITION = 3;
+ private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+ private static final int NUM_PARTITIONS = 16;
@BeforeMethod
- @Override
protected void setup() throws Exception {
- conf.setTransactionCoordinatorEnabled(true);
- super.baseSetup();
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
-
- admin.namespaces().createNamespace("public/default");
+ setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
}
@AfterMethod(alwaysRun = true)
@@ -74,7 +69,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
@Test
public void testMarkerDeleteTimes() throws Exception {
- ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test"));
+ ManagedLedgerImpl managedLedger =
+ spy((ManagedLedgerImpl) getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
PersistentTopic topic = mock(PersistentTopic.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
@@ -85,8 +81,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
doReturn(managedLedger).when(topic).getManagedLedger();
ManagedCursor cursor = managedLedger.openCursor("test");
- PersistentSubscription persistentSubscription = spy(new PersistentSubscription(topic, "test",
- cursor, false));
+ PersistentSubscription persistentSubscription =
+ spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false);
Position position = managedLedger.addEntry("test".getBytes());
persistentSubscription.acknowledgeMessage(Collections.singletonList(position),
AckType.Individual, Collections.emptyMap());
@@ -96,84 +92,74 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
@Test
public void testMarkerDelete() throws Exception {
-
- MessageMetadata msgMetadata = new MessageMetadata().clear()
- .setPublishTime(1)
- .setProducerName("test")
- .setSequenceId(0);
-
- ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
- payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
- msgMetadata, payload);
-
- ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
- PersistentTopic topic = mock(PersistentTopic.class);
- doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
- doReturn(managedLedger).when(topic).getManagedLedger();
- doReturn("test").when(topic).getName();
- ManagedCursor cursor = managedLedger.openCursor("test");
- PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
- managedLedger.openCursor("test"), false);
-
- byte[] payloadBytes = toBytes(payload);
- Position position1 = managedLedger.addEntry(payloadBytes);
- Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
- .newTxnCommitMarker(1, 1, 1)));
-
- Position position2 = managedLedger.addEntry(payloadBytes);
- Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- Position position3 = managedLedger.addEntry(payloadBytes);
-
- assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
- assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
- // ack position1, markerDeletePosition to markerPosition1
- persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
- AckType.Individual, Collections.emptyMap());
-
- // ack position1, markerDeletePosition to markerPosition1
- Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
- ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition1) == 0);
-
- // ack position2, markerDeletePosition to markerPosition2
- persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
- AckType.Individual, Collections.emptyMap());
-
- Awaitility.await().until(() ->
- ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition2) == 0);
-
- // add consequent marker
- managedLedger.addEntry(toBytes(Markers
- .newTxnCommitMarker(1, 1, 1)));
-
- managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- // ack with transaction, then commit this transaction
- persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
- Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();
-
- persistentSubscription.endTxn(0, 0, 0, 0).get();
-
- // ack with transaction, then commit this transaction
- Awaitility.await().until(() ->
- ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition3) == 0);
-
+ final String subName = "testMarkerDelete";
+ final String topicName = NAMESPACE1 + "/testMarkerDelete";
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient
+ .newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .isAckReceiptEnabled(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topicName)
+ .create();
+
+ Transaction txn1 = getTxn();
+ Transaction txn2 = getTxn();
+ Transaction txn3 = getTxn();
+ Transaction txn4 = getTxn();
+
+ MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send();
+ MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+ txn1.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to msgId1, msgId2 have not be committed
+ assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString());
+
+ MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send();
+ txn2.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1,
+ // because send msgId2 before commit txn1
+ assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString());
+
+ MessageIdImpl msgId4 = (MessageIdImpl) producer.newMessage(txn4).send();
+ txn3.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to txn2 marker, because msgId4 have not be committed
+ assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 1).toString());
+
+ txn4.abort().get();
+
+ // maxReadPosition move to txn4 abort marker, so entryId is msgId4.getEntryId() + 2
+ Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInternalStats(topicName)
+ .cursors.get(subName).markDeletePosition, PositionImpl.get(msgId4.getLedgerId(),
+ msgId4.getEntryId() + 2).toString()));
}
- static byte[] toBytes(ByteBuf byteBuf) {
- byte[] buf = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(buf);
- byteBuf.release();
- return buf;
+ private Transaction getTxn() throws Exception {
+ return pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS)
+ .build()
+ .get();
}
}