You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/05/19 07:55:17 UTC

[pulsar] branch branch-2.10 updated (d0af6d20082 -> 58bd9bc1035)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from d0af6d20082 Upgrade Netty to 4.1.77.Final and netty-tcnative to 2.0.52.Final (#15646)
     new 48402853178 [Fix][txn] Make transaction stats consistent at end txn (#15472)
     new 65ab3aba1dd [Fix][Txn]Fix transaction component recover fillQueue (#15418)
     new 58bd9bc1035 [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../buffer/impl/TopicTransactionBuffer.java        |   8 +-
 .../pendingack/impl/MLPendingAckStore.java         |   6 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |  22 ++--
 .../pulsar/broker/transaction/TransactionTest.java |  24 +++++
 .../pendingack/PendingAckPersistentTest.java       | 117 +++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |   6 +-
 .../impl/MLTransactionMetadataStore.java           |  29 +++--
 7 files changed, 175 insertions(+), 37 deletions(-)


[pulsar] 03/03: [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 58bd9bc1035a74f9f6656a2f8d1048d705e7cece
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 13 13:48:34 2022 +0800

    [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
    
    ### Motivation
    Now, PendingAckHandle use the ending transaction ID to append abort mark, but it is wrong. We should append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
    ### Modification
    Append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
    
    (cherry picked from commit 498cde9ad3dd62142d73e024ea424bd76726dfaa)
---
 .../pendingack/impl/PendingAckHandleImpl.java      |  22 ++--
 .../pendingack/PendingAckPersistentTest.java       | 117 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index f001b39972e..7e11592e063 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -600,21 +600,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
             if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
                     && firstTxn.getLeastSigBits() <= lowWaterMark) {
-                this.pendingAckStoreFuture.whenComplete((pendingAckStore, throwable) -> {
-                    if (throwable == null) {
-                        pendingAckStore.appendAbortMark(txnID, AckType.Individual).thenAccept(v -> {
-                            synchronized (PendingAckHandleImpl.this) {
-                                log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
-                                        + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                                individualAckOfTransaction.remove(firstTxn);
-                                handleLowWaterMark(txnID, lowWaterMark);
-                            }
-                        }).exceptionally(e -> {
-                            log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
-                                    + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
-                            return null;
-                        });
-                    }
+                abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
+                    log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+                }).exceptionally(e -> {
+                    log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+                            + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
+                    return null;
                 });
             }
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 58408cdee10..7bc562ca62b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -32,6 +34,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -44,6 +47,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -345,4 +350,116 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
         assertFalse(topics.contains(topic));
     }
+
+    @Test
+    public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "test").toString();
+
+        String subName = "subName";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().send();
+        }
+
+        Transaction transaction1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
+        transaction1.commit().get();
+
+
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
+            transaction2 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build()
+                    .get();
+        }
+
+        Transaction transaction3 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) {
+            transaction3 = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build()
+                    .get();
+        }
+
+        Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
+        transaction2.commit().get();
+
+        Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+
+        Field field = TransactionImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(transaction1, TransactionImpl.State.OPEN);
+
+        consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
+        Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+        field.set(transaction2, TransactionImpl.State.OPEN);
+        consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();
+
+        Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
+        transaction3.commit().get();
+
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) getPulsarServiceList()
+                        .get(0)
+                        .getBrokerService()
+                        .getTopic(topic, false)
+                        .get()
+                        .get();
+
+        PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
+        PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);
+
+        Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
+        method.setAccessible(true);
+        method.invoke(pendingAckHandle);
+
+        Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+        field1.setAccessible(true);
+        CompletableFuture<PendingAckStore> completableFuture =
+                (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);
+
+        Awaitility.await().until(() -> {
+            completableFuture.get();
+            return true;
+        });
+
+        Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+        field2.setAccessible(true);
+        LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
+                (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);
+
+        assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID()));
+        assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
+
+    }
 }


[pulsar] 02/03: [Fix][Txn]Fix transaction component recover fillQueue (#15418)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 65ab3aba1dd0c5a52ec6a9674f68d19925c5f9ef
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri May 6 17:00:35 2022 +0800

    [Fix][Txn]Fix transaction component recover fillQueue (#15418)
    
    ### Motivation & Modification
    The queue size + NUMBER_OF_PER_READ_ENTRY should <= the capacity of queue instead of the queue size <= the capacity of queue. If the processing speed is less than the read speed, the part that exceeds the queue capacity will be ignored.
    ### About test
    It hard to add test for this change.
    This class of replay/recover/FillEntryQueueCallback is not public, so it can not be import by test.
    And then their flied can not be changed by reflection.
    With `Thread.sleep` allowed, we  can only test the recover of  TC by call a `Thread.sleep` in `transactionLogReplayCallback` and can not test TP/TB.
    
    (cherry picked from commit 51190ba9187645d9d39cdd787112f9172a38e58f)
---
 .../broker/transaction/buffer/impl/TopicTransactionBuffer.java    | 8 ++++++--
 .../broker/transaction/pendingack/impl/MLPendingAckStore.java     | 6 ++++--
 .../pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java | 6 ++++--
 3 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b0749757e8c..3cbc3f14ea5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -698,6 +698,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
         private volatile boolean isReadable = true;
 
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
+
         private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, ManagedCursor cursor,
                                        TopicTransactionBufferRecover recover) {
             this.entryQueue = entryQueue;
@@ -705,10 +707,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
             this.recover = recover;
         }
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.LATEST);
+                    cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY,
+                            this, System.nanoTime(), PositionImpl.LATEST);
                 } else {
                     if (entryQueue.size() == 0) {
                         isReadable = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index ccc1628154c..ee5cc22f340 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -367,12 +367,14 @@ public class MLPendingAckStore implements PendingAckStore {
 
         private volatile boolean isReadable = true;
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
 
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    readAsync(100, this);
+                    readAsync(NUMBER_OF_PER_READ_ENTRY, this);
                 }
             }
             return isReadable;
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 3da19eccf14..f9fd728f24b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -237,12 +237,14 @@ public class MLTransactionLogImpl implements TransactionLog {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
         private volatile boolean isReadable = true;
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
 
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    readAsync(100, this);
+                    readAsync(NUMBER_OF_PER_READ_ENTRY, this);
                     return isReadable;
                 } else {
                     return false;


[pulsar] 01/03: [Fix][txn] Make transaction stats consistent at end txn (#15472)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 48402853178f2ba1cdc0fe1df7dcee223456a9a5
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat May 7 20:24:15 2022 +0800

    [Fix][txn] Make transaction stats consistent at end txn (#15472)
    
    ### Motivation
    When the end transaction log is appended to the transaction log, the transaction ended. The transaction should be removed from the `txnMetaMap`. If transactionLog fails to delete the location, we only need to log it.
    ### Modification
    Not complete exceptionally, but only give a warn log.
    
    (cherry picked from commit 5e6580abc8aea515581f0d23964b46bb58e493f4)
---
 .../pulsar/broker/transaction/TransactionTest.java | 24 ++++++++++++++++++
 .../impl/MLTransactionMetadataStore.java           | 29 ++++++++++------------
 2 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 777d5c90363..7ab49481466 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
@@ -966,4 +967,27 @@ public class TransactionTest extends TransactionTestBase {
                     Integer.parseInt(lastConfirmedEntry[1]) - 2);
         });
     }
+
+    @Test
+    public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
+        TransactionMetadataStore transactionMetadataStore = getPulsarServiceList().get(0)
+                .getTransactionMetadataStoreService()
+                .getStores()
+                .get(new TransactionCoordinatorID(0));
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("transactionLog");
+        field.setAccessible(true);
+        MLTransactionLogImpl transactionLog = (MLTransactionLogImpl) field.get(transactionMetadataStore);
+        Field field1 = MLTransactionLogImpl.class.getDeclaredField("cursor");
+        field1.setAccessible(true);
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) field1.get(transactionLog);
+        managedCursor.close();
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.commit().get();
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f93de8b0175..685d57e664e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -372,23 +372,20 @@ public class MLTransactionMetadataStore
                                     this.transactionTimeoutCount.increment();
                                 }
                                 if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                                    transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> {
-                                        if (ex != null) {
-                                            promise.completeExceptionally(ex);
-                                            return;
-                                        }
-                                        this.transactionMetadataStoreStats
-                                                .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                                        - txnMetaListPair.getLeft().getOpenTimestamp());
-                                        if (newStatus == TxnStatus.COMMITTED) {
-                                            committedTransactionCount.increment();
-                                        } else {
-                                            abortedTransactionCount.increment();
-                                        }
-                                        txnMetaMap.remove(txnID.getLeastSigBits());
-                                        promise.complete(null);
+                                    this.transactionMetadataStoreStats
+                                            .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                    - txnMetaListPair.getLeft().getOpenTimestamp());
+                                    if (newStatus == TxnStatus.COMMITTED) {
+                                        committedTransactionCount.increment();
+                                    } else {
+                                        abortedTransactionCount.increment();
+                                    }
+                                    txnMetaMap.remove(txnID.getLeastSigBits());
+                                    transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
+                                        log.warn("Failed to delete transaction log position "
+                                                + "at end transaction [{}]", txnID);
+                                        return null;
                                     });
-                                    return;
                                 }
                                 promise.complete(null);
                             } catch (InvalidTxnStatusException e) {