You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:45 UTC

[pulsar] 14/15: [Transaction]Txn client check timeout (#12521)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b244d0c3bc625bb4d4d3108735fd2a219d328a50
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Dec 18 08:35:18 2021 +0800

    [Transaction]Txn client check timeout (#12521)
    
    ### Motivation
    Optimize the logic on the Transaction Client side.
    Avoid sending and acking messages with timeout  transactions.
    
    ### Modifications
    
    * TransactionImp
    
         *  Add a tool field for CAS to replace State : STATE_UPDATE.
    **When committing and aborted, only the successful cas operation will make subsequent judgments, otherwise it will return a failure future**
         *   Implement TimerTasker to execute tasks that replace the state of the transaction as Aborted.
    * TransactionBuildImpl
         * In the callback of the build method, call the timer of PulsarClient to start a Timeout. Pass in the corresponding transactionImpl (TimeTasker has been implemented)
    
    (cherry picked from commit c5d7a84c8e5c27e48022df8c7082496840cd3be9)
---
 .../client/impl/ConsumerAckResponseTest.java       |  2 +
 .../client/impl/TransactionEndToEndTest.java       | 67 +++++++++++++++++++++-
 .../TransactionCoordinatorClientException.java     | 20 +++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  4 ++
 .../client/impl/PartitionedProducerImpl.java       |  5 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  4 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |  1 +
 .../impl/transaction/TransactionBuilderImpl.java   |  9 ++-
 .../client/impl/transaction/TransactionImpl.java   | 38 +++++++++---
 9 files changed, 137 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
index 0378c53..6981865 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
@@ -50,8 +50,10 @@ public class ConsumerAckResponseTest extends ProducerConsumerBase {
         super.producerBaseSetup();
         doReturn(1L).when(transaction).getTxnIdLeastBits();
         doReturn(1L).when(transaction).getTxnIdMostBits();
+        doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
         CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
         doNothing().when(transaction).registerAckOp(any());
+        doReturn(true).when(transaction).checkIfOpen(any());
         doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
 
         Thread.sleep(1000 * 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 4630449..f52d319 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
@@ -770,19 +772,46 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             }
         });
 
+        Class<TransactionImpl> transactionClass = TransactionImpl.class;
+        Constructor<TransactionImpl> constructor = transactionClass
+                .getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class);
+        constructor.setAccessible(true);
+
+        TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5,
+                        timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits());
+
         try {
-            timeoutTxn.commit().get();
+            timeoutTxnSkipClientTimeout.commit().get();
             fail();
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof TransactionNotFoundException);
         }
         Field field = TransactionImpl.class.getDeclaredField("state");
         field.setAccessible(true);
-        TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn);
+        TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
         assertEquals(state, TransactionImpl.State.ERROR);
     }
 
     @Test
+    public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
+        TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .newTransaction(new TransactionCoordinatorID(0), 1).get();
+        Awaitility.await().until(() -> {
+            try {
+               getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
+                return false;
+            } catch (Exception e) {
+                return true;
+            }
+        });
+        Collection<TransactionMetadataStore> transactionMetadataStores =
+                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
+        long timeoutCount = transactionMetadataStores.stream()
+                .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
+        Assert.assertEquals(timeoutCount, 1);
+    }
+
+    @Test
     public void transactionTimeoutTest() throws Exception {
         String topic = NAMESPACE1 + "/txn-timeout";
 
@@ -943,4 +972,38 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
         assertTrue(flag);
     }
+
+    @Test
+    public void testTxnTimeOutInClient() throws Exception{
+        String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+                .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
+                .topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
+
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage().send();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+        });
+
+        try {
+            producer.newMessage(transaction).send();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException
+                    .InvalidTxnStatusException);
+        }
+        try {
+            Message<String> message = consumer.receive();
+            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
+                    .InvalidTxnStatusException);
+        }
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index 0e1f6c7..d7df4e3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -68,6 +68,11 @@ public class TransactionCoordinatorClientException extends IOException {
         public InvalidTxnStatusException(String message) {
             super(message);
         }
+
+        public InvalidTxnStatusException(String txnId, String actualState, String expectState) {
+            super("["+ txnId +"] with unexpected state : "
+                    + actualState + ", expect " + expectState + " state!");
+        }
     }
 
     /**
@@ -93,6 +98,21 @@ public class TransactionCoordinatorClientException extends IOException {
         }
     }
 
+
+    /**
+     * Thrown when transaction meta was timeout.
+     */
+    public static class TransactionTimeotException extends TransactionCoordinatorClientException {
+
+        public TransactionTimeotException(Throwable t) {
+            super(t);
+        }
+
+        public TransactionTimeotException(String transactionId) {
+            super("The transaction " +  transactionId + " is timeout.");
+        }
+    }
+
     /**
      * Thrown when send request to transaction meta store but the transaction meta store handler not ready.
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1251593..0f47208 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -487,6 +487,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         if (null != txn) {
             checkArgument(txn instanceof TransactionImpl);
             txnImpl = (TransactionImpl) txn;
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+           if (!txnImpl.checkIfOpen(completableFuture)) {
+               return completableFuture;
+           }
         }
         return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3311050..216d775 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -191,6 +192,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+        if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+            return completableFuture;
+        }
         int partition = routerPolicy.choosePartition(message, topicMetadata);
         checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
                 "Illegal partition index chosen by the message routing policy: " + partition);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a3ca386..5944c8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -371,6 +371,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         if (txn == null) {
             return internalSendAsync(message);
         } else {
+            CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
+            if (!((TransactionImpl)txn).checkIfOpen(completableFuture)) {
+               return completableFuture;
+            }
             return ((TransactionImpl) txn).registerProducedTopic(topic)
                         .thenCompose(ignored -> internalSendAsync(message));
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 7703afc..c5195c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,7 @@ public class PulsarClientImpl implements PulsarClient {
     protected final ClientConfigurationData conf;
     private LookupService lookup;
     private final ConnectionPool cnxPool;
+    @Getter
     private final Timer timer;
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 84be46f..3ac8676 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -67,8 +69,11 @@ public class TransactionBuilderImpl implements TransactionBuilder {
                         future.completeExceptionally(throwable);
                         return;
                     }
-                    future.complete(new TransactionImpl(client, txnTimeout,
-                            txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                    TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+                            txnID.getLeastSigBits(), txnID.getMostSigBits());
+                    client.getTimer().newTimeout(transaction,
+                            txnTimeout, timeUnit);
+                    future.complete(transaction);
                 });
         return future;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 458976a..4128a6f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.Lists;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -48,7 +51,7 @@ import org.apache.pulsar.common.util.FutureUtil;
  */
 @Slf4j
 @Getter
-public class TransactionImpl implements Transaction {
+public class TransactionImpl implements Transaction , TimerTask {
 
     private final PulsarClientImpl client;
     private final long transactionTimeoutMs;
@@ -63,6 +66,13 @@ public class TransactionImpl implements Transaction {
     private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
     private final ArrayList<CompletableFuture<Void>> ackFutureList;
     private volatile State state;
+    private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
+        AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
+    }
 
     public enum State {
         OPEN,
@@ -70,7 +80,8 @@ public class TransactionImpl implements Transaction {
         ABORTING,
         COMMITTED,
         ABORTED,
-        ERROR
+        ERROR,
+        TIMEOUT
     }
 
     TransactionImpl(PulsarClientImpl client,
@@ -93,7 +104,8 @@ public class TransactionImpl implements Transaction {
 
     // register the topics that will be modified by this transaction
     public CompletableFuture<Void> registerProducedTopic(String topic) {
-        return checkIfOpen().thenCompose(value -> {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (checkIfOpen(completableFuture)) {
             synchronized (TransactionImpl.this) {
                 // we need to issue the request to TC to register the produced topic
                 return registerPartitionMap.compute(topic, (key, future) -> {
@@ -106,7 +118,9 @@ public class TransactionImpl implements Transaction {
                     }
                 });
             }
-        });
+        } else {
+            return completableFuture;
+        }
     }
 
     public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -115,7 +129,8 @@ public class TransactionImpl implements Transaction {
 
     // register the topics that will be modified by this transaction
     public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
-        return checkIfOpen().thenCompose(value -> {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (checkIfOpen(completableFuture)) {
             synchronized (TransactionImpl.this) {
                 // we need to issue the request to TC to register the acked topic
                 return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
@@ -128,7 +143,9 @@ public class TransactionImpl implements Transaction {
                     }
                 });
             }
-        });
+        } else {
+            return completableFuture;
+        }
     }
 
     public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
@@ -213,11 +230,14 @@ public class TransactionImpl implements Transaction {
         return new TxnID(txnIdMostBits, txnIdLeastBits);
     }
 
-    private CompletableFuture<Void> checkIfOpen() {
+    public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
         if (state == State.OPEN) {
-            return CompletableFuture.completedFuture(null);
+            return true;
         } else {
-            return invalidTxnStatusFuture();
+            completableFuture
+                    .completeExceptionally(new InvalidTxnStatusException(
+                            new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
+            return false;
         }
     }