You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/14 08:02:47 UTC

[pulsar] branch master updated: [Transaction] Guarantee transaction metadata handlers connect (#8563)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8394dca  [Transaction] Guarantee transaction metadata handlers connect (#8563)
8394dca is described below

commit 8394dcadd348d8b73be2301e7d31bbef91091c6d
Author: ran <ga...@126.com>
AuthorDate: Sat Nov 14 16:02:21 2020 +0800

    [Transaction] Guarantee transaction metadata handlers connect (#8563)
    
    ### Motivation
    
    Currently, the transaction metadata handlers start with pulsar client start, but the handlers connect with the broker asynchronously, if the client restart, the metadata handler may not be available.
    
    ### Modifications
    
    Add the connection future for the metadata handler.
---
 .../client/impl/TransactionEndToEndTest.java       | 56 ++++++++++++++++++++++
 .../client/impl/TransactionMetaStoreHandler.java   |  8 +++-
 .../TransactionCoordinatorClientImpl.java          | 23 +++++++--
 3 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 9bd12e8..f108bc6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,12 +23,17 @@ import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -36,7 +41,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -504,4 +512,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void txnMetadataHandlerRecoverTest() throws Exception {
+        String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Map<TxnID, List<MessageId>> txnIDListMap = new HashMap<>();
+
+        int txnCnt = 20;
+        int messageCnt = 10;
+        for (int i = 0; i < txnCnt; i++) {
+            TransactionImpl txn = (TransactionImpl) pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.MINUTES)
+                    .build().get();
+            List<MessageId> messageIds = new ArrayList<>();
+            for (int j = 0; j < messageCnt; j++) {
+                MessageId messageId = producer.newMessage(txn).value("Hello".getBytes()).sendAsync().get();
+                messageIds.add(messageId);
+            }
+            txnIDListMap.put(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()), messageIds);
+        }
+
+        pulsarClient.close();
+        PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+
+        TransactionCoordinatorClient tcClient = recoverPulsarClient.getTcClient();
+        for (Map.Entry<TxnID, List<MessageId>> entry : txnIDListMap.entrySet()) {
+            tcClient.commit(entry.getKey(), entry.getValue());
+        }
+
+        Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        for (int i = 0; i < txnCnt * messageCnt; i++) {
+            Message<byte[]> message = consumer.receive();
+            Assert.assertNotNull(message);
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 646a73a..2b4fa48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -71,7 +71,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
 
     private Timeout requestTimeout;
 
-    public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic) {
+    private CompletableFuture<Void> connectFuture;
+
+    public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic,
+                                       CompletableFuture<Void> connectFuture) {
         super(pulsarClient, topic);
         this.transactionCoordinatorId = transactionCoordinatorId;
         this.timeoutQueue = new ConcurrentLinkedQueue<>();
@@ -87,6 +90,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                 .create(),
             this);
         this.connectionHandler.grabCnx();
+        this.connectFuture = connectFuture;
     }
 
     @Override
@@ -94,6 +98,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.",
             transactionCoordinatorId, exception);
         setState(State.Failed);
+        this.connectFuture.completeExceptionally(exception);
     }
 
     @Override
@@ -105,6 +110,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         if (!changeToReadyState()) {
             cnx.channel().close();
         }
+        this.connectFuture.complete(null);
     }
 
     public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index c0092b3..5693ede 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -75,34 +76,48 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
             return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
-                .thenAccept(partitionMeta -> {
+                .thenCompose(partitionMeta -> {
+                    List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions);
                     }
                     if (partitionMeta.partitions > 0) {
                         handlers = new TransactionMetaStoreHandler[partitionMeta.partitions];
                         for (int i = 0; i < partitionMeta.partitions; i++) {
-                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient,
-                                    TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+                            CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                            connectFutureList.add(connectFuture);
+                            TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(
+                                    i, pulsarClient, getTCAssignTopicName(i), connectFuture);
                             handlers[i] = handler;
                             handlerMap.put(i, handler);
                         }
                     } else {
                         handlers = new TransactionMetaStoreHandler[1];
+                        CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+                        connectFutureList.add(connectFuture);
                         TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient,
-                                TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                                getTCAssignTopicName(-1), connectFuture);
                         handlers[0] = handler;
                         handlerMap.put(0, handler);
                     }
 
                     STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY);
 
+                    return FutureUtil.waitForAll(connectFutureList);
                 });
         } else {
             return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state));
         }
     }
 
+    private String getTCAssignTopicName(int partition) {
+        if (partition >= 0) {
+            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+        } else {
+            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString();
+        }
+    }
+
     @Override
     public void close() throws TransactionCoordinatorClientException {
         try {