You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:32 UTC

[pulsar] 09/13: [Transaction] Fix subscription ack transaction marker. (#14170)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ebd5b6e56c21ec7f3b12c15dbdef076a21887e6
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:41:48 2022 +0800

    [Transaction] Fix subscription ack transaction marker. (#14170)
    
    (cherry picked from commit 603d252ed6e405d2015d2d0fb73047bb9a96b268)
---
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 .../service/persistent/PersistentSubscription.java |  83 ----------
 .../org/apache/pulsar/broker/BrokerTestUtil.java   |  15 ++
 .../service/TransactionMarkerDeleteTest.java       | 184 ++++++++++-----------
 4 files changed, 104 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index f98cfe5..ba757b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -126,6 +126,10 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
+                    // because consumer can receive message is smaller than maxReadPosition,
+                    // so this marker is useless for this subscription
+                    subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
+                            Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cd9f462..f69db68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -69,7 +69,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -77,8 +76,6 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,26 +111,15 @@ public class PersistentSubscription implements Subscription {
     private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
 
     private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
-    private volatile Position lastMarkDeleteForTransactionMarker;
     private final PendingAckHandle pendingAckHandle;
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
-    private DeleteTransactionMarkerState deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
-
-    private final Object waitObject = new Object();
-
     static {
         REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
     }
 
-    public enum DeleteTransactionMarkerState {
-        Process,
-        Wait,
-        None
-    }
-
     static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
         return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
     }
@@ -419,8 +405,6 @@ public class PersistentSubscription implements Subscription {
             }
         }
 
-        deleteTransactionMarker(properties);
-
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
             if (dispatcher != null) {
@@ -429,73 +413,6 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
-    private void deleteTransactionMarker(Map<String, Long> properties) {
-
-        if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            PositionImpl currentMarkDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
-            if ((lastMarkDeleteForTransactionMarker == null
-                    || ((PositionImpl) lastMarkDeleteForTransactionMarker)
-                    .compareTo(currentMarkDeletePosition) < 0)) {
-                if (currentMarkDeletePosition != null) {
-                    ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
-                    PositionImpl nextPosition = managedLedger.getNextValidPosition(currentMarkDeletePosition);
-                    if (nextPosition != null
-                            && nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) {
-                        synchronized (waitObject) {
-                            if (deleteTransactionMarkerState == DeleteTransactionMarkerState.None) {
-                                deleteTransactionMarkerState = DeleteTransactionMarkerState.Process;
-                                managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
-                                    @Override
-                                    public void readEntryComplete(Entry entry, Object ctx) {
-                                        try {
-                                            MessageMetadata messageMetadata =
-                                                    Commands.parseMessageMetadata(entry.getDataBuffer());
-                                            if (Markers.isTxnCommitMarker(messageMetadata)
-                                                    || Markers.isTxnAbortMarker(messageMetadata)) {
-                                                synchronized (waitObject) {
-                                                    deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
-                                                }
-                                                lastMarkDeleteForTransactionMarker = currentMarkDeletePosition;
-                                                acknowledgeMessage(Collections.singletonList(nextPosition),
-                                                        AckType.Individual, properties);
-                                            } else {
-                                                synchronized (waitObject) {
-                                                    if (deleteTransactionMarkerState
-                                                            == DeleteTransactionMarkerState.Wait) {
-                                                        deleteTransactionMarkerState =
-                                                                DeleteTransactionMarkerState.None;
-                                                        deleteTransactionMarker(properties);
-                                                    } else {
-                                                        deleteTransactionMarkerState =
-                                                                DeleteTransactionMarkerState.None;
-                                                    }
-                                                }
-                                            }
-                                        } finally {
-                                            entry.release();
-                                        }
-                                    }
-
-                                    @Override
-                                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                        synchronized (waitObject) {
-                                            deleteTransactionMarkerState =
-                                                    DeleteTransactionMarkerState.None;
-                                        }
-                                        log.error("Fail to read transaction marker! Position : {}",
-                                                currentMarkDeletePosition, exception);
-                                    }
-                                }, null);
-                            } else if (deleteTransactionMarkerState == DeleteTransactionMarkerState.Process) {
-                                deleteTransactionMarkerState = DeleteTransactionMarkerState.Wait;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     public CompletableFuture<Void> transactionIndividualAcknowledge(
             TxnID txnId,
             List<MutablePair<PositionImpl, Integer>> positions) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index ff03e42..224060c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker;
 
 import java.util.UUID;
+import org.mockito.Mockito;
 
 /**
  * Holds util methods used in test.
@@ -29,4 +30,18 @@ public class BrokerTestUtil {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    /**
+     * Creates a Mockito spy directly without an intermediate instance to spy.
+     * This is to address flaky test issue where a spy created with a given instance fails with
+     * {@link org.mockito.exceptions.misusing.WrongTypeOfReturnValue} exception.
+     *
+     * @param classToSpy the class to spy
+     * @param args the constructor arguments to use when creating the spy instance
+     * @return a spy of the provided class created with given constructor arguments
+     */
+    public static <T> T spyWithClassAndConstructorArgs(Class<T> classToSpy, Object... args) {
+        return Mockito.mock(classToSpy, Mockito.withSettings()
+                .useConstructor(args)
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index f25b346..aa2a8d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -25,45 +26,39 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import static org.testng.Assert.assertNull;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.testng.collections.Sets;
 
 @Test(groups = "broker")
-public class TransactionMarkerDeleteTest extends BrokerTestBase {
+public class TransactionMarkerDeleteTest extends TransactionTestBase {
 
+    private static final int TOPIC_PARTITION = 3;
+    private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
-    @Override
     protected void setup() throws Exception {
-        conf.setTransactionCoordinatorEnabled(true);
-        super.baseSetup();
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
-
-        admin.namespaces().createNamespace("public/default");
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -74,7 +69,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
 
     @Test
     public void testMarkerDeleteTimes() throws Exception {
-        ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test"));
+        ManagedLedgerImpl managedLedger =
+                spy((ManagedLedgerImpl) getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
         PersistentTopic topic = mock(PersistentTopic.class);
         BrokerService brokerService = mock(BrokerService.class);
         PulsarService pulsarService = mock(PulsarService.class);
@@ -85,8 +81,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
         doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
         doReturn(managedLedger).when(topic).getManagedLedger();
         ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = spy(new PersistentSubscription(topic, "test",
-                cursor, false));
+        PersistentSubscription persistentSubscription =
+                spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false);
         Position position = managedLedger.addEntry("test".getBytes());
         persistentSubscription.acknowledgeMessage(Collections.singletonList(position),
                 AckType.Individual, Collections.emptyMap());
@@ -96,84 +92,74 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
 
     @Test
     public void testMarkerDelete() throws Exception {
-
-        MessageMetadata msgMetadata = new MessageMetadata().clear()
-                .setPublishTime(1)
-                .setProducerName("test")
-                .setSequenceId(0);
-
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
-        payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                msgMetadata, payload);
-
-        ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
-        PersistentTopic topic = mock(PersistentTopic.class);
-        doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
-        doReturn(managedLedger).when(topic).getManagedLedger();
-        doReturn("test").when(topic).getName();
-        ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
-                managedLedger.openCursor("test"), false);
-
-        byte[] payloadBytes = toBytes(payload);
-        Position position1 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        Position position2 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position position3 = managedLedger.addEntry(payloadBytes);
-
-        assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
-        assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
-        // ack position1, markerDeletePosition to markerPosition1
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
-                AckType.Individual, Collections.emptyMap());
-
-        // ack position1, markerDeletePosition to markerPosition1
-        Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition1) == 0);
-
-        // ack position2, markerDeletePosition to markerPosition2
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
-                AckType.Individual, Collections.emptyMap());
-
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition2) == 0);
-
-        // add consequent marker
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        // ack with transaction, then commit this transaction
-        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
-                Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();
-
-        persistentSubscription.endTxn(0, 0, 0, 0).get();
-
-        // ack with transaction, then commit this transaction
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition3) == 0);
-
+        final String subName = "testMarkerDelete";
+        final String topicName = NAMESPACE1 + "/testMarkerDelete";
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        Transaction txn1 = getTxn();
+        Transaction txn2 = getTxn();
+        Transaction txn3 = getTxn();
+        Transaction txn4 = getTxn();
+
+        MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send();
+        MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+        txn1.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to msgId1, msgId2 have not be committed
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString());
+
+        MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send();
+        txn2.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1,
+        // because send msgId2 before commit txn1
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString());
+
+        MessageIdImpl msgId4 = (MessageIdImpl) producer.newMessage(txn4).send();
+        txn3.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn2 marker, because msgId4 have not be committed
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 1).toString());
+
+        txn4.abort().get();
+
+        // maxReadPosition move to txn4 abort marker, so entryId is msgId4.getEntryId() + 2
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInternalStats(topicName)
+                .cursors.get(subName).markDeletePosition, PositionImpl.get(msgId4.getLedgerId(),
+                msgId4.getEntryId() + 2).toString()));
     }
 
-    static byte[] toBytes(ByteBuf byteBuf) {
-        byte[] buf = new byte[byteBuf.readableBytes()];
-        byteBuf.readBytes(buf);
-        byteBuf.release();
-        return buf;
+    private Transaction getTxn() throws Exception {
+        return pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
     }
 }