You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/04 06:07:40 UTC
[pulsar] branch master updated: [Transaction] Handle Acknowledge In
The Transaction (#7856)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1fdffe7 [Transaction] Handle Acknowledge In The Transaction (#7856)
1fdffe7 is described below
commit 1fdffe753bcc6ebe2c7888234cdd7ccd12766250
Author: ran <ga...@126.com>
AuthorDate: Fri Sep 4 14:07:19 2020 +0800
[Transaction] Handle Acknowledge In The Transaction (#7856)
Fix https://github.com/streamnative/pulsar/issues/1307
Master Issue: #2664
### Motivation
Handle acknowledge in the transaction.
### Modifications
1. Register subscription to the transaction metadata store.
2. Make the ack command carry the transaction information.
3. When committing a transaction, commit every subscription in the transaction.
---
.../broker/TransactionMetadataStoreService.java | 25 +++++-
.../org/apache/pulsar/broker/service/Consumer.java | 30 ++++++-
.../apache/pulsar/broker/service/ServerCnx.java | 57 +++++++++++--
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../nonpersistent/NonPersistentSubscription.java | 7 ++
.../service/persistent/PersistentSubscription.java | 34 ++++++--
.../TransactionMetadataStoreServiceTest.java | 9 +-
.../broker/transaction/TransactionProduceTest.java | 97 ++++++++++++++++++++++
.../buffer/TransactionBufferClientTest.java | 7 ++
.../transaction/TransactionCoordinatorClient.java | 21 +++++
.../impl/AcknowledgmentsGroupingTracker.java | 6 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 8 ++
.../apache/pulsar/client/impl/ConsumerBase.java | 6 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 16 ++--
...NonPersistentAcknowledgmentGroupingTracker.java | 6 +-
.../PersistentAcknowledgmentsGroupingTracker.java | 44 ++++++----
.../client/impl/TransactionMetaStoreHandler.java | 39 ++++++++-
.../TransactionCoordinatorClientImpl.java | 26 ++++++
.../client/impl/transaction/TransactionImpl.java | 7 +-
.../impl/AcknowledgementsGroupingTrackerTest.java | 16 ++--
.../apache/pulsar/common/protocol/Commands.java | 6 +-
.../coordinator/TransactionMetadataStore.java | 2 +-
.../coordinator/TransactionSubscription.java | 61 ++++++++++++++
.../pulsar/transaction/coordinator/TxnMeta.java | 4 +-
.../impl/InMemTransactionMetadataStore.java | 3 +-
.../transaction/coordinator/impl/TxnMetaImpl.java | 11 +--
.../TransactionMetadataStoreProviderTest.java | 40 +++++----
27 files changed, 500 insertions(+), 90 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index b1ef17b..fd7ebc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
@@ -148,7 +149,7 @@ public class TransactionMetadataStoreService {
return store.addProducedPartitionToTxn(txnId, partitions);
}
- public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<String> partitions) {
+ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
@@ -207,12 +208,27 @@ public class TransactionMetadataStoreService {
private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+ List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
return;
}
+
+ txnMeta.ackedPartitions().forEach(tbSub -> {
+ CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
+ if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+ actionFuture = tbClient.commitTxnOnSubscription(
+ tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+ } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+ actionFuture = tbClient.abortTxnOnSubscription(
+ tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+ } else {
+ actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
+ }
+ completableFutureList.add(actionFuture);
+ });
+
txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
@@ -222,10 +238,11 @@ public class TransactionMetadataStoreService {
} else {
actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
- commitFutureList.add(actionFuture);
+ completableFutureList.add(actionFuture);
});
+
try {
- FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
+ FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
if (waitThrowable != null) {
resultFuture.completeExceptionally(waitThrowable);
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4a78943..1fb8e74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -46,6 +46,8 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.Long
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -59,6 +61,7 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -413,7 +416,12 @@ public class Consumer {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
}
}
- subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, properties);
+ List<Position> positionsAcked = Collections.singletonList(position);
+ if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
+ transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
+ } else {
+ subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
+ }
} else {
// Individual ack
List<Position> positionsAcked = new ArrayList<>();
@@ -436,7 +444,25 @@ public class Consumer {
consumerId, position, ack.getValidationError());
}
}
- subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
+ if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
+ transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
+ } else {
+ subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
+ }
+ }
+ }
+
+ private void transactionAcknowledge(long txnidMostBits, long txnidLeastBits,
+ List<Position> positionList, AckType ackType) {
+ if (subscription instanceof PersistentSubscription) {
+ TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+ try {
+ ((PersistentSubscription) subscription).acknowledgeMessage(txnID, positionList, ackType);
+ } catch (TransactionConflictException e) {
+ log.error("Transaction acknowledge failed for txn " + txnID, e);
+ }
+ } else {
+ log.error("Transaction acknowledge only support the `PersistentSubscription`.");
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 071ef80..e8d644d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -69,6 +70,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -125,6 +127,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1740,7 +1743,48 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription command) {
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(), command.getTxnidLeastBits(), command.getTxnidMostBits()));
+ final long requestId = command.getRequestId();
+ final long txnidMostBits = command.getTxnidMostBits();
+ final long txnidLeastBits = command.getTxnidLeastBits();
+ final String topic = command.getSubscription().getTopic();
+ final String subName = command.getSubscription().getSubscription();
+
+ service.getTopics().get(command.getSubscription().getTopic())
+ .thenAccept(optionalTopic -> {
+ if (!optionalTopic.isPresent()) {
+ log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic());
+ ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnidLeastBits, txnidMostBits,
+ ServerError.UnknownError,
+ "The topic " + topic + " is not exist in broker."));
+ return;
+ }
+
+ Subscription subscription = optionalTopic.get().getSubscription(subName);
+ if (subscription == null) {
+ log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
+ ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnidLeastBits, txnidMostBits,
+ ServerError.UnknownError,
+ "Topic " + optionalTopic.get().getName() + " subscription " + subName + " is not exist."));
+ return;
+ }
+
+ CompletableFuture<Void> completableFuture =
+ subscription.endTxn(txnidMostBits, txnidLeastBits, command.getTxnAction().getNumber());
+ completableFuture.whenComplete((ignored, throwable) -> {
+ if (throwable != null) {
+ log.error("Handle end txn on subscription failed for request {}", requestId);
+ ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnidLeastBits, txnidMostBits,
+ ServerError.UnknownError,
+ "Handle end txn on subscription failed."));
+ return;
+ }
+ ctx.writeAndFlush(
+ Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
+ });
+ });
}
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
@@ -1769,8 +1813,11 @@ public class ServerCnx extends PulsarHandler {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
command.getRequestId(), remoteAddress, txnID);
}
- List<String> subscriptionList = command.getSubscriptionList().stream()
- .map(subscription -> subscription.getTopic() + "|" + subscription.getSubscription())
+ List<TransactionSubscription> subscriptionList = command.getSubscriptionList().stream()
+ .map(subscription -> TransactionSubscription.builder()
+ .topic(subscription.getTopic())
+ .subscription(subscription.getSubscription())
+ .build())
.collect(Collectors.toList());
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)
.whenComplete(((v, ex) -> {
@@ -1779,7 +1826,7 @@ public class ServerCnx extends PulsarHandler {
log.debug("Send response success for add published partition to txn request {}",
command.getRequestId());
}
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(),
+ ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
txnID.getLeastSigBits(), txnID.getMostSigBits()));
log.info("handle add partition to txn finish.");
} else {
@@ -1787,7 +1834,7 @@ public class ServerCnx extends PulsarHandler {
log.debug("Send response error for add published partition to txn request {}",
command.getRequestId(), ex);
}
- ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+ ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 85b5415..8f6b5fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -101,6 +101,8 @@ public interface Subscription {
// Default is no-op
}
+ CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction);
+
// Subscription utils
static boolean isCumulativeAckMode(SubType subType) {
return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index fecfd09..8675703 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -474,6 +474,13 @@ public class NonPersistentSubscription implements Subscription {
return CompletableFuture.completedFuture(null);
}
+ @Override
+ public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(new Exception("Unsupported operation end txn for NonPersistentSubscription"));
+ return completableFuture;
+ }
+
private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5476b8d..7abd21d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
@@ -66,6 +67,7 @@ import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsS
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -1158,14 +1160,22 @@ public class PersistentSubscription implements Subscription {
List<Position> positions = pendingAckMessagesMap != null ? this.pendingAckMessagesMap.remove(txnId).values() :
Collections.emptyList();
// Materialize all single acks.
- cursor.asyncDelete(positions, deleteCallback, positions);
- if (pendingAckMessages != null) {
- positions.forEach(position -> this.pendingAckMessages.remove(position));
+ if (positions != null) {
+ cursor.asyncDelete(positions, deleteCallback, positions);
+ if (pendingAckMessages != null) {
+ positions.forEach(position -> this.pendingAckMessages.remove(position));
+ }
+ } else {
+ deleteFuture.complete(null);
}
// Materialize cumulative ack.
- cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == properties)?
- Collections.emptyMap() : properties, markDeleteCallback, this.pendingCumulativeAckMessage);
+ if (this.pendingCumulativeAckMessage != null) {
+ cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == properties)?
+ Collections.emptyMap() : properties, markDeleteCallback, this.pendingCumulativeAckMessage);
+ } else {
+ marketDeleteFuture.complete(null);
+ }
// Reset txdID and position for cumulative ack.
PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
@@ -1238,6 +1248,20 @@ public class PersistentSubscription implements Subscription {
}
}
+ @Override
+ public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
+ TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ if (PulsarApi.TxnAction.COMMIT.getNumber() == txnAction) {
+ completableFuture = commitTxn(txnID, Collections.emptyMap());
+ } else if (PulsarApi.TxnAction.ABORT.getNumber() == txnAction) {
+ completableFuture = abortTxn(txnID, null);
+ } else {
+ completableFuture.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
+ }
+ return completableFuture;
+ }
+
@VisibleForTesting
public ManagedCursor getCursor() {
return cursor;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 12a664c..a99f918 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.junit.Assert;
@@ -102,10 +103,10 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
- List<String> partitions = new ArrayList<>();
- partitions.add("ptn-0");
- partitions.add("ptn-1");
- partitions.add("ptn-2");
+ List<TransactionSubscription> partitions = new ArrayList<>();
+ partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
+ partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
+ partitions.add(TransactionSubscription.builder().topic("ptn-3").subscription("sub-1").build());
transactionMetadataStoreService.addAckedPartitionToTxn(txnID, partitions);
TxnMeta txn = transactionMetadataStoreService.getTxnMeta(txnID).get();
assertEquals(txn.status(), TxnStatus.OPEN);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index edcfd17..6bdc3f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -32,12 +33,20 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -50,6 +59,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -330,4 +340,91 @@ public class TransactionProduceTest extends TransactionTestBase {
}
}
+ @Test
+ public void ackCommitTest() throws Exception {
+ final String subscriptionName = "ackCommitTest";
+ Transaction txn = ((PulsarClientImpl) pulsarClient)
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ log.info("init transaction {}.", txn);
+
+ Producer<byte[]> incomingProducer = pulsarClient.newProducer()
+ .topic(TOPIC_OUTPUT)
+ .batchingMaxMessages(1)
+ .roundRobinRouterBatchingPartitionSwitchFrequency(1)
+ .create();
+ int incomingMessageCnt = 10;
+ for (int i = 0; i < incomingMessageCnt; i++) {
+ incomingProducer.newMessage().value("Hello Txn.".getBytes()).sendAsync();
+ }
+ log.info("prepare incoming messages finished.");
+
+ MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(TOPIC_OUTPUT)
+ .subscriptionName(subscriptionName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ for (int i = 0; i < incomingMessageCnt; i++) {
+ Message<byte[]> message = consumer.receive();
+ log.info("receive messageId: {}", message.getMessageId());
+ consumer.acknowledgeAsync(message.getMessageId(), txn);
+ }
+
+ Thread.sleep(1000);
+
+ // The pending messages count should be the incomingMessageCnt
+ Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
+
+ consumer.redeliverUnacknowledgedMessages();
+ for (int i = 0; i < incomingMessageCnt; i++) {
+ Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ }
+
+ // The pending messages count should be the incomingMessageCnt
+ Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
+
+ txn.commit().get();
+
+ Thread.sleep(1000);
+
+ // After commit, the pending messages count should be 0
+ Assert.assertEquals(getPendingAckCount(subscriptionName), 0);
+
+ consumer.redeliverUnacknowledgedMessages();
+ for (int i = 0; i < incomingMessageCnt; i++) {
+ Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ }
+
+ log.info("finish test ackCommitTest");
+ }
+
+ private int getPendingAckCount(String subscriptionName) throws Exception {
+ Class<PersistentSubscription> clazz = PersistentSubscription.class;
+ Field field = clazz.getDeclaredField("pendingAckMessages");
+ field.setAccessible(true);
+
+ int pendingAckCount = 0;
+ for (PulsarService pulsarService : getPulsarServiceList()) {
+ for (String key : pulsarService.getBrokerService().getTopics().keys()) {
+ if (key.startsWith("persistent://" + TOPIC_OUTPUT)) {
+ PersistentSubscription subscription =
+ (PersistentSubscription) pulsarService.getBrokerService()
+ .getTopics().get(key).get().get().getSubscription(subscriptionName);
+ ConcurrentOpenHashSet<Position> set = (ConcurrentOpenHashSet<Position>) field.get(subscription);
+ if (set != null) {
+ pendingAckCount += set.size();
+ }
+ }
+ }
+ }
+ log.info("pendingAckCount: {}", pendingAckCount);
+ return pendingAckCount;
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 159b5c1..8e666a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +71,14 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
public void afterPulsarStart() throws Exception {
super.afterPulsarStart();
for (int i = 0; i < pulsarServices.length; i++) {
+ Subscription mockSubscription = Mockito.mock(Subscription.class);
+ Mockito.when(mockSubscription.endTxn(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
Topic mockTopic = Mockito.mock(Topic.class);
Mockito.when(mockTopic.endTxn(Mockito.any(), Mockito.anyInt()))
.thenReturn(CompletableFuture.completedFuture(null));
+ Mockito.when(mockTopic.getSubscription(Mockito.any())).thenReturn(mockSubscription);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap =
Mockito.mock(ConcurrentOpenHashMap.class);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
index 18454ce..90e0be6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
@@ -126,6 +126,27 @@ public interface TransactionCoordinatorClient extends Closeable {
CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions);
/**
+ * Add ack subscription to txn.
+ *
+ * @param txnID transaction id
+ * @param topic topic name
+ * @param subscription subscription name
+ * @throws TransactionCoordinatorClientException
+ */
+ void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+ throws TransactionCoordinatorClientException;
+
+ /**
+ * Add ack subscription to txn asynchronously.
+ *
+ * @param txnID transaction id
+ * @param topic topic name
+ * @param subscription subscription name
+ * @return
+ */
+ CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription);
+
+ /**
* Commit txn.
* @param txnID txn id to commit.
*/
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index a624e15..bf821c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -30,11 +30,13 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
boolean isDuplicate(MessageId messageId);
- void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
+ void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+ long txnidMostBits, long txnidLeastBits);
void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties);
- void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties);
+ void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+ Map<String, Long> properties, long txnidMostBits, long txnidLeastBits);
void flush();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 975ac37..b32dc99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -868,6 +868,14 @@ public class ClientCnx extends PulsarHandler {
}
@Override
+ protected void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse command) {
+ TransactionMetaStoreHandler handler = checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
+ if (handler != null) {
+ handler.handleAddSubscriptionToTxnResponse(command);
+ }
+ }
+
+ @Override
protected void handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse command) {
log.info("handleEndTxnOnPartitionResponse");
TransactionBufferHandler handler = checkAndGetTransactionBufferHandler();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 57fc65b..d762c2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -428,7 +428,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
TransactionImpl txn) {
CompletableFuture<Void> ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
if (txn != null) {
- txn.registerAckedTopic(getTopic());
+ txn.registerAckedTopic(getTopic(), subscription);
return txn.registerAckOp(ackFuture);
} else {
return ackFuture;
@@ -439,11 +439,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
Map<String,Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture = doAcknowledge(messageId, ackType, properties, txn);
- if (txn != null) {
+ if (txn != null && (this instanceof ConsumerImpl)) {
// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
- txn.registerAckedTopic(getTopic());
+ txn.registerAckedTopic(getTopic(), subscription);
// register the ackFuture as part of the transaction
return txn.registerAckOp(ackFuture);
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cfaaa89..a327c4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -537,7 +537,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
if (messageId instanceof BatchMessageIdImpl) {
- if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) {
+ if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties) && txnImpl == null) {
// all messages in batch have been acked so broker can be acked via sendAcknowledge()
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] acknowledging message - {}, acktype {}", subscription, consumerName, messageId,
@@ -546,8 +546,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
} else {
if (conf.isBatchIndexAckEnabled()) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
- acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
- batchMessageId.getBatchSize(), ackType, properties);
+ acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId,
+ batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), ackType, properties,
+ txnImpl == null ? -1 : txnImpl.getTxnIdMostBits(),
+ txnImpl == null ? -1 : txnImpl.getTxnIdLeastBits());
}
// other messages in batch are still pending ack.
return CompletableFuture.completedFuture(null);
@@ -578,7 +580,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageIdImpl = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId()
, batchMessageId.getPartitionIndex());
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
- batchMessageId.getBatchSize(), ackType, properties);
+ batchMessageId.getBatchSize(), ackType, properties,
+ txn == null ? -1 : txn.getTxnIdMostBits(),
+ txn == null ? -1 : txn.getTxnIdLeastBits());
stats.incrementNumAcksSent(batchMessageId.getBatchSize());
} else {
messageIdImpl = (MessageIdImpl) messageId;
@@ -751,7 +755,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
}
- acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType, properties);
+ acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType, properties,
+ txnImpl == null ? -1 : txnImpl.getTxnIdMostBits(),
+ txnImpl == null ? -1 : txnImpl.getTxnIdLeastBits());
// Consumer acknowledgment operation immediately succeeds. In any case, if we're not able to send ack to broker,
// the messages will be re-delivered
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index d70c7ce..425204d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -42,7 +42,8 @@ public class NonPersistentAcknowledgmentGroupingTracker implements Acknowledgmen
}
@Override
- public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
+ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+ long txnidMostBits, long txnidLeastBits) {
// no-op
}
@@ -52,7 +53,8 @@ public class NonPersistentAcknowledgmentGroupingTracker implements Acknowledgmen
}
@Override
- public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map<String, Long> properties) {
+ public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType,
+ Map<String, Long> properties, long txnidMostSets, long txnidLeastSets) {
// no-op
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index fd61c42..b226d40 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -134,11 +134,14 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
@Override
- public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
+ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+ long txnidMostBits, long txnidLeastBits) {
+ if (txnidMostBits != -1 && txnidLeastBits != -1) {
+ doImmediateAck(msgId, ackType, properties, txnidMostBits, txnidLeastBits);
+ } else if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
- doImmediateAck(msgId, ackType, properties);
+ doImmediateAck(msgId, ackType, properties, -1, -1);
} else if (ackType == AckType.Cumulative) {
doCumulativeAck(msgId, null);
} else {
@@ -157,9 +160,12 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
@Override
- public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
- if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
- doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties);
+ public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+ Map<String, Long> properties, long txnidMostBits, long txnidLeastBits) {
+ if (txnidMostBits != -1 && txnidLeastBits != -1) {
+ doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, txnidMostBits, txnidLeastBits);
+ } else if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
+ doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, -1, -1);
} else if (ackType == AckType.Cumulative) {
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
@@ -209,18 +215,21 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
}
- private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
+ private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+ long txnidMostBits, long txnidLeastBits) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
return false;
}
- newAckCommand(consumer.consumerId, msgId, null, ackType, null, properties, cnx, true /* flush */);
+ newAckCommand(consumer.consumerId, msgId, null, ackType, null,
+ properties, cnx, true /* flush */, txnidMostBits, txnidLeastBits);
return true;
}
- private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
+ private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+ Map<String, Long> properties, long txnidMostBits, long txnidLeastBits) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
@@ -239,7 +248,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
bitSet.clear(batchIndex);
}
- final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+ final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType,
+ null, properties, txnidLeastBits, txnidMostBits);
bitSet.recycle();
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
return true;
@@ -261,8 +271,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx,
- false /* flush */);
+ newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, false /* flush */, -1, -1);
shouldFlush=true;
cumulativeAckFlushRequired = false;
}
@@ -301,8 +310,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
break;
}
- newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(),
- cnx, false);
+ newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, -1, -1);
shouldFlush = true;
}
}
@@ -349,11 +357,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
- AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush) {
+ AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
+ boolean flush, long txnidMostBits, long txnidLeastBits) {
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
- if (chunkMsgIds != null) {
- if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != AckType.Cumulative) {
+ if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
+ if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
+ && ackType != AckType.Cumulative) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 37d85c8..a95c937 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -180,6 +180,43 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
onResponse(op);
}
+ public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<PulsarApi.Subscription> subscriptionList) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add subscription {} to txn {}.", subscriptionList, txnID);
+ }
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ long requestId = client.newRequestId();
+ ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+ requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+ OpForVoidCallBack op = OpForVoidCallBack.create(cmd, completableFuture);
+ pendingRequests.put(requestId, op);
+ timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+ cmd.retain();
+ cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+ return completableFuture;
+ }
+
+ public void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse response) {
+ OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId());
+ if (op == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add subscription to txn timeout for request {}.", response.getRequestId());
+ }
+ return;
+ }
+ if (!response.hasError()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add subscription to txn success for request {}.", response.getRequestId());
+ }
+ op.callback.complete(null);
+ } else {
+ LOG.error("Add subscription to txn failed for request {} error {}.",
+ response.getRequestId(), response.getError());
+ op.callback.completeExceptionally(getExceptionByServerError(response.getError(), response.getMessage()));
+ }
+ onResponse(op);
+ }
+
public CompletableFuture<Void> commitAsync(TxnID txnID) {
if (LOG.isDebugEnabled()) {
LOG.debug("Commit txn {}", txnID);
@@ -388,7 +425,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
RequestTime lastPolled = timeoutQueue.poll();
if (lastPolled != null) {
OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
- if (!op.callback.isDone()) {
+ if (op != null && !op.callback.isDone()) {
op.callback.completeExceptionally(new PulsarClientException.TimeoutException(
"Could not get response from transaction meta store within given timeout."));
if (LOG.isDebugEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 7ead17d..9d79bb2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -177,6 +179,30 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
}
@Override
+ public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+ throws TransactionCoordinatorClientException {
+ try {
+ addSubscriptionToTxnAsync(txnID, topic, subscription).get();
+ } catch (Exception e) {
+ throw TransactionCoordinatorClientException.unwrap(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription) {
+ TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits());
+ if (handler == null) {
+ return FutureUtil.failedFuture(
+ new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits()));
+ }
+ PulsarApi.Subscription sub = PulsarApi.Subscription.newBuilder()
+ .setTopic(topic)
+ .setSubscription(subscription)
+ .build();
+ return handler.addSubscriptionToTxn(txnID, Collections.singletonList(sub));
+ }
+
+ @Override
public void commit(TxnID txnID) throws TransactionCoordinatorClientException {
try {
commitAsync(txnID).get();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 38530b7..7c04357 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -92,7 +92,7 @@ public class TransactionImpl implements Transaction {
// register the topics that will be modified by this transaction
public synchronized void registerProducedTopic(String topic) {
if (producedTopics.add(topic)) {
- // TODO: we need to issue the request to TC to register the produced topic
+ // we need to issue the request to TC to register the produced topic
tcClient.addPublishPartitionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
}
}
@@ -109,9 +109,10 @@ public class TransactionImpl implements Transaction {
}
// register the topics that will be modified by this transaction
- public synchronized void registerAckedTopic(String topic) {
+ public synchronized void registerAckedTopic(String topic, String subscription) {
if (ackedTopics.add(topic)) {
- // TODO: we need to issue the request to TC to register the acked topic
+ // we need to issue the request to TC to register the acked topic
+ tcClient.addSubscriptionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription);
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 1eb5b42..151f2ae 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -74,12 +74,12 @@ public class AcknowledgementsGroupingTrackerTest {
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg1));
assertFalse(tracker.isDuplicate(msg2));
- tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap());
+ tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg1));
assertTrue(tracker.isDuplicate(msg2));
assertTrue(tracker.isDuplicate(msg3));
@@ -99,7 +99,7 @@ public class AcknowledgementsGroupingTrackerTest {
assertTrue(tracker.isDuplicate(msg5));
assertFalse(tracker.isDuplicate(msg6));
- tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg6));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -188,7 +188,7 @@ public class AcknowledgementsGroupingTrackerTest {
when(consumer.getClientCnx()).thenReturn(null);
- tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
assertFalse(tracker.isDuplicate(msg1));
when(consumer.getClientCnx()).thenReturn(cnx);
@@ -196,7 +196,7 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.flush();
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg2, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg2, AckType.Individual, Collections.emptyMap(), -1, -1);
// Since we were connected, the ack went out immediately
assertFalse(tracker.isDuplicate(msg2));
tracker.close();
@@ -248,12 +248,12 @@ public class AcknowledgementsGroupingTrackerTest {
assertFalse(tracker.isDuplicate(msg1));
- tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg1));
assertFalse(tracker.isDuplicate(msg2));
- tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap());
+ tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg1));
assertTrue(tracker.isDuplicate(msg2));
assertTrue(tracker.isDuplicate(msg3));
@@ -273,7 +273,7 @@ public class AcknowledgementsGroupingTrackerTest {
assertTrue(tracker.isDuplicate(msg5));
assertFalse(tracker.isDuplicate(msg6));
- tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap());
+ tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap(), -1, -1);
assertTrue(tracker.isDuplicate(msg6));
when(consumer.getClientCnx()).thenReturn(cnx);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index b33ff6d..7838067 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -932,7 +932,7 @@ public class Commands {
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String, Long> properties) {
- return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, 0, 0);
+ return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, -1, -1);
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
@@ -952,10 +952,10 @@ public class Commands {
if (validationError != null) {
ackBuilder.setValidationError(validationError);
}
- if (txnIdMostBits > 0) {
+ if (txnIdMostBits >= 0) {
ackBuilder.setTxnidMostBits(txnIdMostBits);
}
- if (txnIdLeastBits > 0) {
+ if (txnIdLeastBits >= 0) {
ackBuilder.setTxnidLeastBits(txnIdLeastBits);
}
for (Map.Entry<String, Long> e : properties.entrySet()) {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 357aa9b..8d4b69f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -79,7 +79,7 @@ public interface TransactionMetadataStore {
* @return a future represents the result of the operation
*/
CompletableFuture<Void> addAckedPartitionToTxn(
- TxnID txnid, List<String> partitions);
+ TxnID txnid, List<TransactionSubscription> partitions);
/**
* Update the transaction from <tt>expectedStatus</tt> to <tt>newStatus</tt>.
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
new file mode 100644
index 0000000..bcda38e
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {
+
+ private String topic;
+ private String subscription;
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(topic, subscription);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TransactionSubscription that = (TransactionSubscription) o;
+ return topic.equals(that.topic)
+ && subscription.equals(that.subscription);
+ }
+
+ @Override
+ public int compareTo(TransactionSubscription o) {
+ int topicCompare = topic.compareTo(o.topic);
+ if (topicCompare == 0) {
+ return subscription.compareTo(o.subscription);
+ }
+ return topicCompare;
+ }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index 39d3bc7..cbc8dc9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -59,7 +59,7 @@ public interface TxnMeta {
* @return the list of partitions that this transaction acknowledges to.
* the returned list is sorted by partition name.
*/
- List<String> ackedPartitions();
+ List<TransactionSubscription> ackedPartitions();
/**
* Add the list of produced partitions to the transaction.
@@ -78,7 +78,7 @@ public interface TxnMeta {
* @throws InvalidTxnStatusException if the transaction is not in
* {@link TxnStatus#OPEN}
*/
- TxnMeta addAckedPartitions(List<String> partitions)
+ TxnMeta addAckedPartitions(List<TransactionSubscription> partitions)
throws InvalidTxnStatusException;
/**
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index e72cbf5..15bcd41 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
@@ -86,7 +87,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
}
@Override
- public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<String> partitions) {
+ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<TransactionSubscription> partitions) {
return getTxnMeta(txnid).thenCompose(txn -> {
try {
txn.addAckedPartitions(partitions);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index ec9c021..0b1ae74 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
@@ -37,7 +38,7 @@ class TxnMetaImpl implements TxnMeta {
private final TxnID txnID;
private final Set<String> producedPartitions = new HashSet<>();
- private final Set<String> ackedPartitions = new HashSet<>();
+ private final Set<TransactionSubscription> ackedPartitions = new HashSet<>();
private TxnStatus txnStatus;
TxnMetaImpl(TxnID txnID) {
@@ -72,8 +73,8 @@ class TxnMetaImpl implements TxnMeta {
}
@Override
- public List<String> ackedPartitions() {
- List<String> returnedPartitions;
+ public List<TransactionSubscription> ackedPartitions() {
+ List<TransactionSubscription> returnedPartitions;
synchronized (this) {
returnedPartitions = new ArrayList<>(ackedPartitions.size());
returnedPartitions.addAll(ackedPartitions);
@@ -118,9 +119,9 @@ class TxnMetaImpl implements TxnMeta {
* @throws InvalidTxnStatusException
*/
@Override
- public synchronized TxnMetaImpl addAckedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+ public synchronized TxnMetaImpl addAckedPartitions(List<TransactionSubscription> partitions)
+ throws InvalidTxnStatusException {
checkTxnStatus(TxnStatus.OPEN);
-
this.ackedPartitions.addAll(partitions);
return this;
}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index c5ae97c..ff4bd31 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -195,10 +195,14 @@ public class TransactionMetadataStoreProviderTest {
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
- List<String> partitions = new ArrayList<>();
- partitions.add("ptn-0");
- partitions.add("ptn-1");
- partitions.add("ptn-2");
+ String topicPartition1 = "persistent://public/default/txn-ack-partition-0";
+ String topicPartition2 = "persistent://public/default/txn-ack-partition-1";
+ String topicPartition3 = "persistent://public/default/txn-ack-partition-2";
+ List<TransactionSubscription> partitions = new ArrayList<>();
+ partitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+ partitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+ partitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-2").build());
+ partitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-4").build());
// add the list of partitions to the transaction
this.store.addAckedPartitionToTxn(txnID, partitions).get();
@@ -208,29 +212,31 @@ public class TransactionMetadataStoreProviderTest {
assertEquals(txn.ackedPartitions(), partitions);
// add another list of partition. duplicated partitions should be removed
- List<String> newPartitions = new ArrayList<>();
- newPartitions.add("ptn-2");
- newPartitions.add("ptn-3");
- newPartitions.add("ptn-4");
+ List<TransactionSubscription> newPartitions = new ArrayList<>();
+ newPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+ newPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+ newPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-5").build());
+ newPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-6").build());
this.store.addAckedPartitionToTxn(txnID, newPartitions);
txn = this.store.getTxnMeta(txnID).get();
assertEquals(txn.status(), TxnStatus.OPEN);
- List<String> finalPartitions = new ArrayList<>();
- finalPartitions.add("ptn-0");
- finalPartitions.add("ptn-1");
- finalPartitions.add("ptn-2");
- finalPartitions.add("ptn-3");
- finalPartitions.add("ptn-4");
+ List<TransactionSubscription> finalPartitions = new ArrayList<>();
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-2").build());
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-5").build());
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-4").build());
+ finalPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-6").build());
assertEquals(txn.ackedPartitions(), finalPartitions);
// change the transaction to `COMMITTING`
this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
// add partitions should fail if it is already committing.
- List<String> newPartitions2 = new ArrayList<>();
- newPartitions2.add("ptn-5");
- newPartitions2.add("ptn-6");
+ List<TransactionSubscription> newPartitions2 = new ArrayList<>();
+ newPartitions2.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-7").build());
+ newPartitions2.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-8").build());
try {
this.store.addAckedPartitionToTxn(txnID, newPartitions2).get();
fail("Should fail to add acked partitions if the transaction is not in OPEN status");