You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/05/31 03:23:11 UTC

[pulsar] branch master updated: [Transaction] Fix transaction ack delete marker position when don't have transaction ack. (#10741)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cdb421  [Transaction] Fix transaction ack delete marker position when don't have transaction ack. (#10741)
5cdb421 is described below

commit 5cdb421d0e5043dd579f752ae15a79054d87d0c9
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon May 31 11:22:33 2021 +0800

    [Transaction] Fix transaction ack delete marker position when don't have transaction ack. (#10741)
    
    ## Motivation
    Now when broke enable transaction, every ack will check the next position is transaction marker position whether or not. When don't have transaction ack, we don' need to check the next position is transaction marker position.
    
    ## implement
    add the judgement for pending ack delete transaction marker position.
---
 .../service/persistent/PersistentSubscription.java |  3 +-
 .../transaction/pendingack/PendingAckHandle.java   |  7 ++++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  5 +++
 .../pendingack/impl/PendingAckHandleImpl.java      | 10 ++++++
 .../service/TransactionMarkerDeleteTest.java       | 41 +++++++++++++---------
 5 files changed, 49 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 99f2820..9cd9e2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -395,7 +395,8 @@ public class PersistentSubscription implements Subscription {
             }
         }
 
-        if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
+        if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && this.pendingAckHandle.isTransactionAckPresent()) {
             Position currentMarkDeletePosition = cursor.getMarkDeletedPosition();
             if ((lastMarkDeleteForTransactionMarker == null
                     || ((PositionImpl) lastMarkDeleteForTransactionMarker)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 10e4452..405e700 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -152,4 +152,11 @@ public interface PendingAckHandle {
      * @return the future of this operation.
      */
     CompletableFuture<Void> close();
+
+    /**
+     * Is transaction ack present.
+     *
+     * @return the the boolean of transaction ack present.
+     */
+    boolean isTransactionAckPresent();
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 1ef3ce3..9b7d857 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -96,4 +96,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public boolean isTransactionAckPresent() {
+        return false;
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index bf6e574..61dfb04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -735,6 +735,16 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
     }
 
+    @Override
+    public boolean isTransactionAckPresent() {
+        if ((this.cumulativeAckOfTransaction == null
+                && (this.individualAckOfTransaction == null || this.individualAckOfTransaction.isEmpty()))) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
     public CompletableFuture<ManagedLedger> getStoreManageLedger() {
         if (this.pendingAckStoreFuture.isDone()) {
             return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index 65b6bd9..4abb911 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -27,19 +27,24 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Markers;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -50,6 +55,7 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setTransactionCoordinatorEnabled(true);
         super.baseSetup();
     }
 
@@ -63,33 +69,36 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
     public void testTransactionMarkerDelete() throws Exception {
         ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
         PersistentTopic topic = mock(PersistentTopic.class);
-        BrokerService brokerService = mock(BrokerService.class);
-        PulsarService pulsarService = mock(PulsarService.class);
-        ServiceConfiguration configuration = mock(ServiceConfiguration.class);
-        doReturn(brokerService).when(topic).getBrokerService();
-        doReturn(pulsarService).when(brokerService).getPulsar();
-        doReturn(configuration).when(pulsarService).getConfig();
-        doReturn(true).when(configuration).isTransactionCoordinatorEnabled();
+        doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
         doReturn(managedLedger).when(topic).getManagedLedger();
-        doReturn(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()).when(topic).getName();
+        doReturn("test").when(topic).getName();
         ManagedCursor cursor = managedLedger.openCursor("test");
         PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
                 managedLedger.openCursor("test"), false);
-        MarkersMessageIdData messageIdData = new MarkersMessageIdData()
-                .setLedgerId(1)
-                .setEntryId(1);
+
         Position position1 = managedLedger.addEntry("test".getBytes());
         managedLedger.addEntry(Markers
                 .newTxnCommitMarker(1, 1, 1).array());
-        Position position3 = managedLedger.addEntry(Markers
-                .newTxnCommitMarker(1, 1, 1).array());
+
+        Position position3 = managedLedger.addEntry("test".getBytes());
+
         assertEquals(cursor.getNumberOfEntriesInBacklog(true), 3);
         assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
+
         persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
                 AckType.Individual, Collections.emptyMap());
-        Thread.sleep(1000L);
-        assertEquals(((PositionImpl) persistentSubscription.getCursor()
-                .getMarkDeletedPosition()).compareTo((PositionImpl) position3), 0);
+
+        Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
+                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
+                        .compareTo((PositionImpl) position1) == 0);
+        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
+                Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();
+
+        persistentSubscription.endTxn(0, 0, 0, 0).get();
+
+        Awaitility.await().until(() ->
+                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
+                        .compareTo((PositionImpl) position3) == 0);
     }
 
     @Test