You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 03:21:07 UTC
[pulsar] branch master updated: [Transaction] Fix transaction
messages order error and deduplication error (#9024)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a5c9c15 [Transaction] Fix transaction messages order error and deduplication error (#9024)
a5c9c15 is described below
commit a5c9c159adfdb5102b490e42fdb189e9c3c4ecaf
Author: ran <ga...@126.com>
AuthorDate: Wed Dec 23 11:20:39 2020 +0800
[Transaction] Fix transaction messages order error and deduplication error (#9024)
### Motivation
Currently, the transaction messages would be produced in the wrong order, and the deduplication check is not work well.
### Modifications
*Describe the modifications you've done.*
### Verifying this change
This change added tests and can be verified as follows:
- *org.apache.pulsar.client.impl.TransactionEndToEndTest#produceTxnMessageOrderTest*
---
.../buffer/impl/TopicTransactionBuffer.java | 58 ++++++++++++++--------
.../broker/transaction/TransactionTestBase.java | 1 +
.../client/impl/TransactionEndToEndTest.java | 39 +++++++++++++++
.../client/impl/TypedMessageBuilderImpl.java | 4 +-
.../client/impl/transaction/TransactionImpl.java | 51 +++++++++----------
5 files changed, 103 insertions(+), 50 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c63d5d1..c8f4fbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,8 +23,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -54,14 +55,18 @@ public class TopicTransactionBuffer implements TransactionBuffer {
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new CompletableFuture<>();
- topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
- if (e != null) {
- log.error("Failed to append buffer to txn {}", txnId, e);
- completableFuture.completeExceptionally(e);
- return;
+ topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ completableFuture.complete(position);
}
- completableFuture.complete(PositionImpl.get(ledgerId, entryId));
- });
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Failed to append buffer to txn {}", txnId, exception);
+ completableFuture.completeExceptionally(exception);
+ }
+ }, null);
return completableFuture;
}
@@ -78,14 +83,19 @@ public class TopicTransactionBuffer implements TransactionBuffer {
ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
- topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
- if (e != null) {
- log.error("Failed to commit for txn {}", txnID, e);
- completableFuture.completeExceptionally(e);
- return;
+
+ topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ completableFuture.complete(null);
}
- completableFuture.complete(null);
- });
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Failed to commit for txn {}", txnID, exception);
+ completableFuture.completeExceptionally(exception);
+ }
+ }, null);
return completableFuture;
}
@@ -98,14 +108,18 @@ public class TopicTransactionBuffer implements TransactionBuffer {
ByteBuf abortMarker = Markers.newTxnAbortMarker(
-1L, txnID.getMostSigBits(), txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
- topic.publishMessage(abortMarker, (e, ledgerId, entryId) -> {
- if (e != null) {
- log.error("Failed to abort for txn {}", txnID, e);
- completableFuture.completeExceptionally(e);
- return;
+ topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ completableFuture.complete(null);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Failed to abort for txn {}", txnID, exception);
+ completableFuture.completeExceptionally(exception);
}
- completableFuture.complete(null);
- });
+ }, null);
return completableFuture;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 8e1a605..d061c82 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -116,6 +116,7 @@ public class TransactionTestBase {
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTransactionCoordinatorEnabled(true);
+ conf.setBrokerDeduplicationEnabled(true);
serviceConfigurationList.add(conf);
PulsarService pulsar = spy(new PulsarService(conf));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index db3d9fe..104eb97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -570,6 +570,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
@Test
public void txnMetadataHandlerRecoverTest() throws Exception {
String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
@@ -592,6 +593,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
pulsarClient.close();
+ @Cleanup
PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
@@ -603,6 +605,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
tcClient.commit(entry.getKey(), entry.getValue());
}
+ @Cleanup
Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
@@ -615,4 +618,40 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
}
+ @Test
+ public void produceTxnMessageOrderTest() throws Exception {
+ String topic = NAMESPACE1 + "/txn-produce-order";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .producerName("txn-publish-order")
+ .create();
+
+ for (int ti = 0; ti < 10; ti++) {
+ Transaction txn = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(2, TimeUnit.SECONDS)
+ .build().get();
+
+ for (int i = 0; i < 1000; i++) {
+ producer.newMessage(txn).value(("" + i).getBytes()).sendAsync();
+ }
+ txn.commit().get();
+
+ for (int i = 0; i < 1000; i++) {
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
+ }
+ }
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index c922b74..b911266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -74,9 +74,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
}
msgMetadataBuilder.setTxnidLeastBits(txn.getTxnIdLeastBits());
msgMetadataBuilder.setTxnidMostBits(txn.getTxnIdMostBits());
- long sequenceId = txn.nextSequenceId();
- msgMetadataBuilder.setSequenceId(sequenceId);
- return sequenceId;
+ return -1L;
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 57986d8..a122ff7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Lists;
@@ -53,10 +55,9 @@ public class TransactionImpl implements Transaction {
private final long transactionTimeoutMs;
private final long txnIdLeastBits;
private final long txnIdMostBits;
- private final AtomicLong sequenceId = new AtomicLong(0L);
- private final Set<String> producedTopics;
- private final Set<String> ackedTopics;
+ private final Map<String, CompletableFuture<Void>> registerPartitionMap;
+ private final Map<String, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
@@ -72,29 +73,27 @@ public class TransactionImpl implements Transaction {
this.txnIdLeastBits = txnIdLeastBits;
this.txnIdMostBits = txnIdMostBits;
- this.producedTopics = new HashSet<>();
- this.ackedTopics = new HashSet<>();
+ this.registerPartitionMap = new ConcurrentHashMap<>();
+ this.registerSubscriptionMap = new ConcurrentHashMap<>();
this.tcClient = client.getTcClient();
this.sendFutureList = new ArrayList<>();
this.ackFutureList = new ArrayList<>();
}
- public long nextSequenceId() {
- return sequenceId.getAndIncrement();
- }
-
// register the topics that will be modified by this transaction
public synchronized CompletableFuture<Void> registerProducedTopic(String topic) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- if (producedTopics.add(topic)) {
- // we need to issue the request to TC to register the produced topic
- completableFuture = tcClient.addPublishPartitionToTxnAsync(
- new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
- } else {
- completableFuture.complete(null);
- }
- return completableFuture;
+ // we need to issue the request to TC to register the produced topic
+ return registerPartitionMap.compute(topic, (key, future) -> {
+ if (future != null) {
+ return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
+ } else {
+ return tcClient.addPublishPartitionToTxnAsync(
+ new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))
+ .thenCompose(ignored -> CompletableFuture.completedFuture(null));
+ }
+ });
}
public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -104,14 +103,16 @@ public class TransactionImpl implements Transaction {
// register the topics that will be modified by this transaction
public synchronized CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- if (ackedTopics.add(topic)) {
- // we need to issue the request to TC to register the acked topic
- completableFuture = tcClient.addSubscriptionToTxnAsync(
- new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription);
- } else {
- completableFuture.complete(null);
- }
- return completableFuture;
+ // we need to issue the request to TC to register the acked topic
+ return registerSubscriptionMap.compute(topic, (key, future) -> {
+ if (future != null) {
+ return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
+ } else {
+ return tcClient.addSubscriptionToTxnAsync(
+ new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription)
+ .thenCompose(ignored -> CompletableFuture.completedFuture(null));
+ }
+ });
}
public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {