You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/14 08:02:47 UTC
[pulsar] branch master updated: [Transaction] Guarantee transaction
metadata handlers connect (#8563)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8394dca [Transaction] Guarantee transaction metadata handlers connect (#8563)
8394dca is described below
commit 8394dcadd348d8b73be2301e7d31bbef91091c6d
Author: ran <ga...@126.com>
AuthorDate: Sat Nov 14 16:02:21 2020 +0800
[Transaction] Guarantee transaction metadata handlers connect (#8563)
### Motivation
Currently, the transaction metadata handlers start with pulsar client start, but the handlers connect with the broker asynchronously, if the client restart, the metadata handler may not be available.
### Modifications
Add the connection future for the metadata handler.
---
.../client/impl/TransactionEndToEndTest.java | 56 ++++++++++++++++++++++
.../client/impl/TransactionMetaStoreHandler.java | 8 +++-
.../TransactionCoordinatorClientImpl.java | 23 +++++++--
3 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 9bd12e8..f108bc6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,12 +23,17 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -36,7 +41,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -504,4 +512,52 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
+ @Test
+ public void txnMetadataHandlerRecoverTest() throws Exception {
+ String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Map<TxnID, List<MessageId>> txnIDListMap = new HashMap<>();
+
+ int txnCnt = 20;
+ int messageCnt = 10;
+ for (int i = 0; i < txnCnt; i++) {
+ TransactionImpl txn = (TransactionImpl) pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build().get();
+ List<MessageId> messageIds = new ArrayList<>();
+ for (int j = 0; j < messageCnt; j++) {
+ MessageId messageId = producer.newMessage(txn).value("Hello".getBytes()).sendAsync().get();
+ messageIds.add(messageId);
+ }
+ txnIDListMap.put(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()), messageIds);
+ }
+
+ pulsarClient.close();
+ PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
+ .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .enableTransaction(true)
+ .build();
+
+ TransactionCoordinatorClient tcClient = recoverPulsarClient.getTcClient();
+ for (Map.Entry<TxnID, List<MessageId>> entry : txnIDListMap.entrySet()) {
+ tcClient.commit(entry.getKey(), entry.getValue());
+ }
+
+ Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ for (int i = 0; i < txnCnt * messageCnt; i++) {
+ Message<byte[]> message = consumer.receive();
+ Assert.assertNotNull(message);
+ }
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 646a73a..2b4fa48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -71,7 +71,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
private Timeout requestTimeout;
- public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic) {
+ private CompletableFuture<Void> connectFuture;
+
+ public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic,
+ CompletableFuture<Void> connectFuture) {
super(pulsarClient, topic);
this.transactionCoordinatorId = transactionCoordinatorId;
this.timeoutQueue = new ConcurrentLinkedQueue<>();
@@ -87,6 +90,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
.create(),
this);
this.connectionHandler.grabCnx();
+ this.connectFuture = connectFuture;
}
@Override
@@ -94,6 +98,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.",
transactionCoordinatorId, exception);
setState(State.Failed);
+ this.connectFuture.completeExceptionally(exception);
}
@Override
@@ -105,6 +110,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
if (!changeToReadyState()) {
cnx.channel().close();
}
+ this.connectFuture.complete(null);
}
public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index c0092b3..5693ede 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -75,34 +76,48 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
- .thenAccept(partitionMeta -> {
+ .thenCompose(partitionMeta -> {
+ List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions);
}
if (partitionMeta.partitions > 0) {
handlers = new TransactionMetaStoreHandler[partitionMeta.partitions];
for (int i = 0; i < partitionMeta.partitions; i++) {
- TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(i, pulsarClient,
- TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+ CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+ connectFutureList.add(connectFuture);
+ TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(
+ i, pulsarClient, getTCAssignTopicName(i), connectFuture);
handlers[i] = handler;
handlerMap.put(i, handler);
}
} else {
handlers = new TransactionMetaStoreHandler[1];
+ CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+ connectFutureList.add(connectFuture);
TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler(0, pulsarClient,
- TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ getTCAssignTopicName(-1), connectFuture);
handlers[0] = handler;
handlerMap.put(0, handler);
}
STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY);
+ return FutureUtil.waitForAll(connectFutureList);
});
} else {
return FutureUtil.failedFuture(new CoordinatorClientStateException("Can not start while current state is " + state));
}
}
+ private String getTCAssignTopicName(int partition) {
+ if (partition >= 0) {
+ return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+ } else {
+ return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString();
+ }
+ }
+
@Override
public void close() throws TransactionCoordinatorClientException {
try {