You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/22 05:08:14 UTC
[pulsar] branch master updated: [Transaction] Transaction log
(#8658)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c25bab8 [Transaction] Transaction log (#8658)
c25bab8 is described below
commit c25bab8f28ef08c6a82acf371e71264022d574e8
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sun Nov 22 13:07:43 2020 +0800
[Transaction] Transaction log (#8658)
Master Issue: [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support)
### Motivation
Implemention of [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support), transaction log
### Modifications
Add TransactionMetadataStore implemention by managed ledger
### Verifying this change
Add the tests for it
---
pom.xml | 2 +
.../broker/TransactionMetadataStoreService.java | 31 +-
.../apache/pulsar/broker/service/ServerCnx.java | 15 +-
.../broker/transaction/buffer/TransactionMeta.java | 2 +-
.../exceptions/TransactionStatusException.java | 2 +-
.../buffer/impl/InMemTransactionBuffer.java | 8 +-
.../TransactionMetadataStoreServiceTest.java | 12 +-
.../transaction/buffer/TransactionBufferTest.java | 2 +-
.../coordinator/generate_protobuf.sh | 23 +
.../coordinator/generate_protobuf_docker.sh | 42 +
pulsar-transaction/coordinator/pom.xml | 32 +-
.../transaction/coordinator/TransactionLog.java | 51 +
.../coordinator/TransactionLogReplayCallback.java | 31 +-
.../coordinator/TransactionMetadataStore.java | 14 +-
.../coordinator/TransactionMetadataStoreState.java | 71 ++
.../pulsar/transaction/coordinator/TxnMeta.java | 5 +-
.../exceptions/CoordinatorException.java | 28 +-
.../impl/InMemTransactionMetadataStore.java | 14 +-
.../coordinator/impl/MLTransactionLogImpl.java | 251 +++++
.../impl/MLTransactionMetadataStore.java | 375 +++++++
.../impl/MLTransactionMetadataStoreProvider.java | 52 +
.../transaction/coordinator/impl/TxnMetaImpl.java | 40 +-
.../proto/PulsarTransactionMetadata.java | 1123 ++++++++++++++++++++
.../coordinator/util/TransactionUtil.java} | 28 +-
.../transaction/coordinator/util/package-info.java | 19 +-
.../src/main/proto/PulsarTransactionMetadata.proto | 53 +
.../MLTransactionMetadataStoreTest.java | 262 +++++
.../TransactionMetadataStoreProviderTest.java | 14 +-
.../transaction/coordinator}/TxnStatusTest.java | 17 +-
.../coordinator/test/MockedBookKeeperTestCase.java | 139 +++
30 files changed, 2643 insertions(+), 115 deletions(-)
diff --git a/pom.xml b/pom.xml
index 02ba4b3..4d0e507 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1149,6 +1149,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>**/*.graffle</exclude>
<exclude>**/*.hgrm</exclude>
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
+ <exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
@@ -1227,6 +1228,7 @@ flexible messaging model and an intuitive client API.</description>
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+ <exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9ec9184..2a65b14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.broker;
import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnAction;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -36,7 +41,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,12 +140,12 @@ public class TransactionMetadataStoreService {
}
}
- public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId) {
+ public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
- return store.newTransaction();
+ return store.newTransaction(timeoutInMills);
}
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
@@ -179,14 +184,14 @@ public class TransactionMetadataStoreService {
return store.updateTxnStatus(txnId, newStatus, expectedStatus);
}
- public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
+ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
TxnStatus newStatus;
switch (txnAction) {
- case PulsarApi.TxnAction.COMMIT_VALUE:
+ case TxnAction.COMMIT_VALUE:
newStatus = TxnStatus.COMMITTING;
break;
- case PulsarApi.TxnAction.ABORT_VALUE:
+ case TxnAction.ABORT_VALUE:
newStatus = TxnStatus.ABORTING;
break;
default:
@@ -210,7 +215,7 @@ public class TransactionMetadataStoreService {
}
private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction,
- List<PulsarApi.MessageIdData> messageIdDataList) {
+ List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
@@ -221,10 +226,10 @@ public class TransactionMetadataStoreService {
txnMeta.ackedPartitions().forEach(tbSub -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
- if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+ if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
- } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+ } else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
} else {
@@ -234,7 +239,7 @@ public class TransactionMetadataStoreService {
});
List<MessageId> messageIdList = new ArrayList<>();
- for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
+ for (MessageIdData messageIdData : messageIdDataList) {
messageIdList.add(new MessageIdImpl(
messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
messageIdData.recycle();
@@ -242,12 +247,12 @@ public class TransactionMetadataStoreService {
txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
- if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+ if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
TopicName.get(partition).getPartitionIndex()).collect(Collectors.toList()));
- } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+ } else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c8eae21..c5ef5fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,7 +37,6 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@@ -129,7 +128,7 @@ import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1669,7 +1668,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
}
TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
- service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId)
+ service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds())
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
@@ -1827,13 +1826,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
command.getRequestId(), remoteAddress, txnID);
}
- List<TransactionSubscription> subscriptionList = command.getSubscriptionList().stream()
- .map(subscription -> TransactionSubscription.builder()
- .topic(subscription.getTopic())
- .subscription(subscription.getSubscription())
- .build())
- .collect(Collectors.toList());
- service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)
+
+ service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
+ MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionList()))
.whenComplete(((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
index a1d00f4..f769c98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* The metadata for the transaction in the transaction buffer.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
index c81b17a..7aebe2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.transaction.buffer.exceptions;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 048bba4..7f707c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -31,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -41,7 +41,7 @@ import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSeal
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* The in-memory implementation of {@link TransactionBuffer}.
@@ -280,7 +280,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
- public CompletableFuture<Void> commitTxn(TxnID txnID, List<PulsarApi.MessageIdData> messageIdDataList) {
+ public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> messageIdDataList) {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
try {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
@@ -307,7 +307,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
- public CompletableFuture<Void> abortTxn(TxnID txnID, List<PulsarApi.MessageIdData> sendMessageIdList) {
+ public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> sendMessageIdList) {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 9a21f18..8b91aea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -71,9 +71,9 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
- TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
- TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1)).get();
- TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2)).get();
+ TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
+ TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get();
+ TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get();
Assert.assertEquals(0, txnID0.getMostSigBits());
Assert.assertEquals(1, txnID1.getMostSigBits());
Assert.assertEquals(2, txnID2.getMostSigBits());
@@ -88,7 +88,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
- TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
+ TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
List<String> partitions = new ArrayList<>();
partitions.add("ptn-0");
partitions.add("ptn-1");
@@ -105,7 +105,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
- TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
+ TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
List<TransactionSubscription> partitions = new ArrayList<>();
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
index 3c8798e..5025479 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
diff --git a/pulsar-transaction/coordinator/generate_protobuf.sh b/pulsar-transaction/coordinator/generate_protobuf.sh
new file mode 100755
index 0000000..959b824
--- /dev/null
+++ b/pulsar-transaction/coordinator/generate_protobuf.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+PROTOC=${PROTOC:-protoc}
+${PROTOC} --java_out=pulsar-transaction/coordinator/src/main/java pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
diff --git a/pulsar-transaction/coordinator/generate_protobuf_docker.sh b/pulsar-transaction/coordinator/generate_protobuf_docker.sh
new file mode 100755
index 0000000..10b9ad8e
--- /dev/null
+++ b/pulsar-transaction/coordinator/generate_protobuf_docker.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Fail script in case of errors
+set -e
+
+ROOT_DIR=$(git rev-parse --show-toplevel)
+COMMON_DIR=$ROOT_DIR/
+cd $COMMON_DIR
+
+BUILD_IMAGE_NAME="${BUILD_IMAGE_NAME:-apachepulsar/pulsar-build}"
+BUILD_IMAGE_VERSION="${BUILD_IMAGE_VERSION:-ubuntu-16.04}"
+
+IMAGE="$BUILD_IMAGE_NAME:$BUILD_IMAGE_VERSION"
+
+echo $IMAGE
+
+# Force to pull image in case it was updated
+docker pull $IMAGE
+
+WORKDIR=/workdir
+docker run -i \
+ -v ${COMMON_DIR}:${WORKDIR} $IMAGE \
+ bash -c "cd ${WORKDIR}; PROTOC=/pulsar/protobuf/src/protoc ./pulsar-transaction/coordinator/generate_protobuf.sh"
+
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index a6eb2e3f..33553cd 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -37,7 +37,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-transaction-common</artifactId>
+ <artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
@@ -47,6 +47,36 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>testmocks</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>../../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ <excludes>**/proto/*</excludes>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
new file mode 100644
index 0000000..fb3b5ca
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+
+/**
+ * A log interface for transaction to read and write transaction operation.
+ */
+public interface TransactionLog {
+
+
+ /**
+ * Replay transaction log to load the transaction map.
+ *
+ * @param transactionLogReplayCallback the call back for replaying the transaction log
+ */
+ void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback);
+
+ /**
+ * Close the transaction log.
+ */
+ CompletableFuture<Void> closeAsync();
+
+ /**
+ * Append the transaction operation to the transaction log.
+ *
+ * @param transactionMetadataEntry {@link TransactionMetadataEntry} transaction metadata entry
+ * @return a future represents the result of this operation
+ */
+ CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry);
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
similarity index 52%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
copy to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
index c81b17a..a61ae86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
@@ -16,22 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
+package org.apache.pulsar.transaction.coordinator;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
/**
- * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
+ * The callback of transaction log replay the transaction operate.
*/
-public class TransactionStatusException extends TransactionBufferException {
+public interface TransactionLogReplayCallback {
- private static final long serialVersionUID = 0L;
+ /**
+ * Transaction log replay complete callback for transaction metadata store.
+ */
+ void replayComplete();
- public TransactionStatusException(TxnID txnId,
- TxnStatus expectedStatus,
- TxnStatus actualStatus) {
- super("Transaction `" + txnId + "` is not in an expected status `" + expectedStatus
- + "`, but is in status `" + actualStatus + "`");
- }
-}
+ /**
+ * Handle metadata entry.
+ *
+ * @param position the transaction operation position
+ * @param transactionMetadataEntry the metadata entry of transaction
+ */
+ void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry);
+
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 8d4b69f..cc933e9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* A store for storing all the transaction metadata.
@@ -55,11 +55,12 @@ public interface TransactionMetadataStore {
/**
* Create a new transaction in the transaction metadata store.
*
+ * @param timeoutInMills the timeout duration of the transaction in mills
* @return a future represents the result of creating a new transaction.
* it returns {@link TxnID} as the identifier for identifying the
* transaction.
*/
- CompletableFuture<TxnID> newTransaction();
+ CompletableFuture<TxnID> newTransaction(long timeoutInMills);
/**
* Add the produced partitions to transaction identified by <tt>txnid</tt>.
@@ -87,6 +88,7 @@ public interface TransactionMetadataStore {
* <p>If the current transaction status is not <tt>expectedStatus</tt>, the
* update will be failed.
*
+ * @param txnid {@link TxnID} for update txn status
* @param newStatus the new txn status that the transaction should be updated to
* @param expectedStatus the expected status that the transaction should be
* @return a future represents the result of the operation
@@ -95,7 +97,15 @@ public interface TransactionMetadataStore {
TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus);
/**
+ * Get the transaction coordinator id.
+ * @return transaction coordinator id
+ */
+ TransactionCoordinatorID getTransactionCoordinatorID();
+
+ /**
* Close the transaction metadata store.
+ *
+ * @return a future represents the result of this operation
*/
CompletableFuture<Void> closeAsync();
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
new file mode 100644
index 0000000..5dbd9ba
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -0,0 +1,71 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * The implement of transaction metadata store state.
+ */
+public abstract class TransactionMetadataStoreState {
+
+ /**
+ * The state of the transactionMetadataStore {@link TransactionMetadataStore}.
+ */
+ public enum State {
+ None,
+ Initializing,
+ Ready,
+ Close
+ }
+
+ private static final AtomicReferenceFieldUpdater<TransactionMetadataStoreState, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, State.class, "state");
+
+ @SuppressWarnings("unused")
+ private volatile State state = null;
+
+ public TransactionMetadataStoreState(State state) {
+ STATE_UPDATER.set(this, state);
+
+ }
+
+ protected boolean changeToReadyState() {
+ return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready));
+ }
+
+ protected boolean changeToInitializingState() {
+ return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing);
+ }
+
+ protected boolean changeToCloseState() {
+ return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
+ || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
+ || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close));
+ }
+
+ protected boolean checkIfReady() {
+ return STATE_UPDATER.get(this) == State.Ready;
+ }
+
+ public State getState() {
+ return STATE_UPDATER.get(this);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index cbc8dc9..509da8c 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* An interface represents the metadata of a transaction in {@link TransactionMetadataStore}.
@@ -74,11 +74,12 @@ public interface TxnMeta {
/**
* Add the list of acked partitions to the transaction.
*
+ * @param subscriptions the ackd subscriptions add to the transaction
* @return transaction meta
* @throws InvalidTxnStatusException if the transaction is not in
* {@link TxnStatus#OPEN}
*/
- TxnMeta addAckedPartitions(List<TransactionSubscription> partitions)
+ TxnMeta addAckedPartitions(List<TransactionSubscription> subscriptions)
throws InvalidTxnStatusException;
/**
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
index bd66876..ed1b894 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.transaction.coordinator.exceptions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* The base exception for exceptions thrown from coordinator.
@@ -85,6 +86,10 @@ public abstract class CoordinatorException extends Exception {
super(message);
}
+ public TransactionNotFoundException(TxnID txnID) {
+ super("The transaction with this txdID `" + txnID + "`not found ");
+ }
+
public TransactionNotFoundException(String message, Throwable cause) {
super(message, cause);
}
@@ -93,4 +98,25 @@ public abstract class CoordinatorException extends Exception {
super(cause);
}
}
+
+ /**
+ * Exception is thrown when a operation of transaction is executed in a error transaction metadata store state.
+ */
+ public static class TransactionMetadataStoreStateException extends CoordinatorException {
+
+ private static final long serialVersionUID = 0L;
+
+ public TransactionMetadataStoreStateException(String message) {
+ super(message);
+ }
+
+ public TransactionMetadataStoreStateException(TransactionCoordinatorID tcID,
+ TransactionMetadataStoreState.State expectedState,
+ TransactionMetadataStoreState.State currentState,
+ String operation) {
+ super("Expect Transaction Coordinator `" + tcID + "` to be in " + expectedState
+ + " status but it is in " + currentState + " state for " + operation);
+
+ }
+ }
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 15bcd41..c6d6f13 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
/**
* An in-memory implementation of {@link TransactionMetadataStore}.
@@ -53,8 +53,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
CompletableFuture<TxnMeta> getFuture = new CompletableFuture<>();
TxnMetaImpl txn = transactions.get(txnid);
if (null == txn) {
- getFuture.completeExceptionally(
- new TransactionNotFoundException("Transaction not found :" + txnid));
+ getFuture.completeExceptionally(new TransactionNotFoundException(txnid));
} else {
getFuture.complete(txn);
}
@@ -62,12 +61,12 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
}
@Override
- public CompletableFuture<TxnID> newTransaction() {
+ public CompletableFuture<TxnID> newTransaction(long timeoutInMills) {
TxnID txnID = new TxnID(
tcID.getId(),
localID.getAndIncrement()
);
- TxnMetaImpl txn = new TxnMetaImpl(txnID);
+ TxnMetaImpl txn = TxnMetaImpl.create(txnID);
transactions.put(txnID, txn);
return CompletableFuture.completedFuture(txnID);
}
@@ -115,6 +114,11 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
}
@Override
+ public TransactionCoordinatorID getTransactionCoordinatorID() {
+ return tcID;
+ }
+
+ @Override
public CompletableFuture<Void> closeAsync() {
transactions.clear();
return CompletableFuture.completedFuture(null);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
new file mode 100644
index 0000000..761169c
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
+import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLTransactionLogImpl implements TransactionLog {
+
+ private static final Logger log = LoggerFactory.getLogger(MLTransactionLogImpl.class);
+
+ private final ManagedLedger managedLedger;
+
+ private final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
+
+ private final ManagedCursor cursor;
+
+ private final static String TRANSACTION_SUBSCRIPTION_NAME = "transaction.subscription";
+
+ private final SpscArrayQueue<Entry> entryQueue;
+
+ //this is for replay
+ private final PositionImpl lastConfirmedEntry;
+
+ private final long tcId;
+
+ private final String topicName;
+
+ public MLTransactionLogImpl(TransactionCoordinatorID tcID,
+ ManagedLedgerFactory managedLedgerFactory) throws Exception {
+ this.topicName = TRANSACTION_LOG_PREFIX + tcID;
+ this.tcId = tcID.getId();
+ this.managedLedger = managedLedgerFactory.open(topicName);
+ this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
+ CommandSubscribe.InitialPosition.Earliest);
+ this.entryQueue = new SpscArrayQueue<>(2000);
+ this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+ }
+
+ @Override
+ public void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback) {
+ new TransactionLogReplayer(transactionLogReplayCallback).start();
+ }
+
+ private void readAsync(int numberOfEntriesToRead,
+ AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
+ cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+ managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ log.info("Transaction log with tcId : {} close managedLedger successful!", tcId);
+ completableFuture.complete(null);
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Transaction log with tcId : {} close managedLedger fail!", tcId);
+ completableFuture.completeExceptionally(exception);
+ }
+ }, null);
+
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
+ int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
+ ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
+ CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+ try {
+ transactionMetadataEntry.writeTo(outStream);
+ managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ buf.release();
+ completableFuture.complete(position);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Transaction log write transaction operation error", exception);
+ buf.release();
+ completableFuture.completeExceptionally(exception);
+ }
+ } , null);
+ } catch (IOException e) {
+ log.error("Transaction log write transaction operation error", e);
+ completableFuture.completeExceptionally(e);
+ } finally {
+ outStream.recycle();
+ }
+ return completableFuture;
+ }
+
+ public CompletableFuture<Void> deletePosition(List<Position> positions) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ this.cursor.asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+ @Override
+ public void deleteComplete(Object position) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Deleted message at {}", topicName,
+ TRANSACTION_SUBSCRIPTION_NAME, position);
+ }
+ completableFuture.complete(null);
+ }
+
+ @Override
+ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}][{}] Failed to delete message at {}", topicName,
+ TRANSACTION_SUBSCRIPTION_NAME, ctx, exception);
+ completableFuture.completeExceptionally(exception);
+ }
+ }, null);
+ return completableFuture;
+ }
+
+ class TransactionLogReplayer {
+
+ private FillEntryQueueCallback fillEntryQueueCallback;
+ private long currentLoadEntryId;
+ private TransactionLogReplayCallback transactionLogReplayCallback;
+
+ TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) {
+ this.fillEntryQueueCallback = new FillEntryQueueCallback();
+ this.transactionLogReplayCallback = transactionLogReplayCallback;
+ }
+
+ public void start() {
+ if (((PositionImpl) cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) {
+ this.transactionLogReplayCallback.replayComplete();
+ return;
+ }
+ while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) {
+ fillEntryQueueCallback.fillQueue();
+ Entry entry = entryQueue.poll();
+ if (entry != null) {
+ ByteBuf buffer = entry.getDataBuffer();
+ currentLoadEntryId = entry.getEntryId();
+ ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer);
+ TransactionMetadataEntry.Builder transactionMetadataEntryBuilder =
+ TransactionMetadataEntry.newBuilder();
+ TransactionMetadataEntry transactionMetadataEntry;
+ try {
+ transactionMetadataEntry =
+ transactionMetadataEntryBuilder.mergeFrom(stream, null).build();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ throw new RuntimeException("TransactionLog convert entry error : ", e);
+ }
+ transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+ entry.release();
+ transactionMetadataEntry.recycle();
+ transactionMetadataEntryBuilder.recycle();
+ stream.recycle();
+ } else {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ //no-op
+ }
+ }
+ }
+ transactionLogReplayCallback.replayComplete();
+ }
+ }
+
+ class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
+
+ private AtomicLong outstandingReadsRequests = new AtomicLong(0);
+
+ void fillQueue() {
+ if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
+ if (cursor.hasMoreEntries()) {
+ outstandingReadsRequests.incrementAndGet();
+ readAsync(100, this);
+ }
+ }
+ }
+
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() {
+ private int i = 0;
+ @Override
+ public Entry get() {
+ Entry entry = entries.get(i);
+ i++;
+ return entry;
+ }
+ }, entries.size());
+
+ outstandingReadsRequests.decrementAndGet();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("Transaction log init fail error!", exception);
+ outstandingReadsRequests.decrementAndGet();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
new file mode 100644
index 0000000..c2e519b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}.
+ */
+public class MLTransactionMetadataStore
+ extends TransactionMetadataStoreState implements TransactionMetadataStore {
+
+ private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
+
+ private final TransactionCoordinatorID tcID;
+ private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+ private final MLTransactionLogImpl transactionLog;
+ private static final long TC_ID_NOT_USED = -1L;
+ private final ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentHashMap<>();
+
+ public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
+ MLTransactionLogImpl mlTransactionLog) {
+ super(State.None);
+ this.tcID = tcID;
+ this.transactionLog = mlTransactionLog;
+
+ if (!changeToInitializingState()) {
+ log.error("Managed ledger transaction metadata store change state error when init it");
+ return;
+ }
+ new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+ @Override
+ public void replayComplete() {
+ if (!changeToReadyState()) {
+ log.error("Managed ledger transaction metadata store change state error when replay complete");
+ }
+ }
+
+ @Override
+ public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
+
+ try {
+
+ TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
+ transactionMetadataEntry.getTxnidLeastBits());
+ switch (transactionMetadataEntry.getMetadataOp()) {
+ case NEW:
+ if (sequenceId.get() < transactionMetadataEntry.getTxnidLeastBits()) {
+ sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+ }
+ if (txnMetaMap.containsKey(txnID)) {
+ txnMetaMap.get(txnID).getRight().add(position);
+ } else {
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ txnMetaMap.put(txnID, MutablePair.of(TxnMetaImpl.create(txnID), positions));
+ }
+ break;
+ case ADD_PARTITION:
+ if (!txnMetaMap.containsKey(txnID)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ txnMetaMap.get(txnID).getLeft()
+ .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
+ txnMetaMap.get(txnID).getRight().add(position);
+ }
+ break;
+ case ADD_SUBSCRIPTION:
+ if (!txnMetaMap.containsKey(txnID)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ txnMetaMap.get(txnID).getLeft()
+ .addAckedPartitions(subscriptionToTxnSubscription(
+ transactionMetadataEntry.getSubscriptionsList()));
+ txnMetaMap.get(txnID).getRight().add(position);
+ }
+ break;
+ case UPDATE:
+ if (!txnMetaMap.containsKey(txnID)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+ if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+ transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> {
+ TxnMeta txnMeta = txnMetaMap.remove(txnID).getLeft();
+ ((TxnMetaImpl) txnMeta).recycle();
+ });
+ } else {
+ txnMetaMap.get(txnID).getLeft()
+ .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+ transactionMetadataEntry.getExpectedStatus());
+ }
+ txnMetaMap.get(txnID).getRight().add(position);
+ }
+ break;
+ default:
+ throw new InvalidTxnStatusException("Transaction `"
+ + txnID + "` load replay metadata operation "
+ + "from transaction log with unknown operation");
+ }
+ } catch (InvalidTxnStatusException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ })).start();
+ }
+
+ @Override
+ public CompletableFuture<TxnStatus> getTxnStatus(TxnID txnID) {
+ return CompletableFuture.completedFuture(txnMetaMap.get(txnID).getLeft().status());
+ }
+
+ @Override
+ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
+ Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
+ CompletableFuture<TxnMeta> completableFuture = new CompletableFuture<>();
+ if (txnMetaListPair == null) {
+ completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
+ } else {
+ completableFuture.complete(txnMetaListPair.getLeft());
+ }
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture<TxnID> newTransaction(long timeOut) {
+ if (!checkIfReady()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException
+ .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+ }
+
+ long mostSigBits = tcID.getId();
+ long leastSigBits = sequenceId.incrementAndGet();
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+ long currentTimeMillis = System.currentTimeMillis();
+ TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+ .newBuilder()
+ .setTxnidMostBits(mostSigBits)
+ .setTxnidLeastBits(leastSigBits)
+ .setStartTime(currentTimeMillis)
+ .setTimeoutMs(timeOut)
+ .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+ .setLastModificationTime(currentTimeMillis)
+ .build();
+ return transactionLog.append(transactionMetadataEntry)
+ .thenCompose(position -> {
+ TxnMeta txn = TxnMetaImpl.create(txnID);
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+ txnMetaMap.put(txnID, pair);
+ transactionMetadataEntry.recycle();
+ return CompletableFuture.completedFuture(txnID);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+ if (!checkIfReady()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+ State.Ready, getState(), "add produced partition"));
+ }
+ return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+ TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+ .newBuilder()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+ .addAllPartitions(partitions)
+ .setLastModificationTime(System.currentTimeMillis())
+ .build();
+
+ return transactionLog.append(transactionMetadataEntry)
+ .thenCompose(position -> {
+ try {
+ txnMetaListPair.getLeft().addProducedPartitions(partitions);
+ txnMetaMap.get(txnID).getRight().add(position);
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ txnMetaMap.get(txnID).getRight().add(position);
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add produced partition error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ return FutureUtil.failedFuture(e);
+ } finally {
+ transactionMetadataEntry.recycle();
+ }
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+ List<TransactionSubscription> txnSubscriptions) {
+ if (!checkIfReady()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+ State.Ready, getState(), "add acked partition"));
+ }
+ return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+ TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+ .newBuilder()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+ .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+ .setLastModificationTime(System.currentTimeMillis())
+ .build();
+
+ return transactionLog.append(transactionMetadataEntry)
+ .thenCompose(position -> {
+ try {
+ txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+ txnMetaMap.get(txnID).getRight().add(position);
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ txnMetaMap.get(txnID).getRight().add(position);
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add acked subscription error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ return FutureUtil.failedFuture(e);
+ } finally {
+ transactionMetadataEntry.recycle();
+ }
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus) {
+ if (!checkIfReady()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+ State.Ready, getState(), "update transaction status"));
+ }
+ return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+
+ TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+ .newBuilder()
+ .setTxnidMostBits(txnID.getMostSigBits())
+ .setTxnidLeastBits(txnID.getLeastSigBits())
+ .setExpectedStatus(expectedStatus)
+ .setMetadataOp(TransactionMetadataOp.UPDATE)
+ .setLastModificationTime(System.currentTimeMillis())
+ .setNewStatus(newStatus)
+ .build();
+
+ return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
+ try {
+ txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+ txnMetaListPair.getRight().add(position);
+ if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+ return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
+ txnMetaMap.remove(txnID);
+ ((TxnMetaImpl) txnMetaListPair.getLeft()).recycle();
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ txnMetaListPair.getRight().add(position);
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add update txn status error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ return FutureUtil.failedFuture(e);
+ } finally {
+ transactionMetadataEntry.recycle();
+ }
+ });
+ });
+ }
+
+ @Override
+ public TransactionCoordinatorID getTransactionCoordinatorID() {
+ return tcID;
+ }
+
+ private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
+ CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<>();
+ Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
+ if (txnMetaListPair == null) {
+ completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
+ } else {
+ completableFuture.complete(txnMetaListPair);
+ }
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return transactionLog.closeAsync().thenCompose(v -> {
+ txnMetaMap.clear();
+ if (!this.changeToCloseState()) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ public static List<Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
+ List<Subscription> subscriptions = new ArrayList<>(tnxSubscriptions.size());
+ for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
+ Subscription.Builder subscriptionBuilder = Subscription.newBuilder();
+ Subscription subscription = subscriptionBuilder
+ .setSubscription(transactionSubscription.getSubscription())
+ .setTopic(transactionSubscription.getTopic()).build();
+ subscriptions.add(subscription);
+ subscriptionBuilder.recycle();
+ }
+ return subscriptions;
+ }
+
+ public static List<TransactionSubscription> subscriptionToTxnSubscription(
+ List<Subscription> subscriptions) {
+ List<TransactionSubscription> transactionSubscriptions = new ArrayList<>(subscriptions.size());
+ for (Subscription subscription : subscriptions) {
+ TransactionSubscription.TransactionSubscriptionBuilder transactionSubscriptionBuilder =
+ TransactionSubscription.builder();
+ transactionSubscriptionBuilder.subscription(subscription.getSubscription());
+ transactionSubscriptionBuilder.topic(subscription.getTopic());
+ transactionSubscriptions
+ .add(transactionSubscriptionBuilder.build());
+ subscription.recycle();
+ }
+ return transactionSubscriptions;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..5e1061d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}.
+ */
+public class MLTransactionMetadataStoreProvider implements TransactionMetadataStoreProvider {
+
+ private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class);
+
+ @Override
+ public CompletableFuture<TransactionMetadataStore>
+ openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory) {
+ TransactionMetadataStore transactionMetadataStore;
+ try {
+ transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorId,
+ new MLTransactionLogImpl(transactionCoordinatorId, managedLedgerFactory));
+ } catch (Exception e) {
+ log.error("MLTransactionMetadataStore init fail", e);
+ return FutureUtil.failedFuture(e);
+ }
+ return CompletableFuture.completedFuture(transactionMetadataStore);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 0b1ae74..6d695c6 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -28,7 +31,8 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
/**
* A class represents the metadata of a transaction stored in
@@ -36,14 +40,38 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus;
*/
class TxnMetaImpl implements TxnMeta {
- private final TxnID txnID;
+ private TxnID txnID;
private final Set<String> producedPartitions = new HashSet<>();
private final Set<TransactionSubscription> ackedPartitions = new HashSet<>();
- private TxnStatus txnStatus;
+ private volatile TxnStatus txnStatus = TxnStatus.OPEN;
+ private final Handle<TxnMetaImpl> recycleHandle;
+
+ private static final Recycler<TxnMetaImpl> RECYCLER = new Recycler<TxnMetaImpl>() {
+ protected TxnMetaImpl newObject(Recycler.Handle<TxnMetaImpl> handle) {
+ return new TxnMetaImpl(handle);
+ }
+ };
+
+ TxnMetaImpl(Handle<TxnMetaImpl> handle) {
+ this.recycleHandle = handle;
+ }
- TxnMetaImpl(TxnID txnID) {
- this.txnID = txnID;
+ // Constructor for transaction metadata
+ static TxnMetaImpl create(TxnID txnID) {
+ @SuppressWarnings("unchecked")
+ TxnMetaImpl txnMeta = RECYCLER.get();
+ txnMeta.txnID = txnID;
+ return txnMeta;
+ }
+
+ public void recycle() {
+ this.producedPartitions.clear();
+ this.ackedPartitions.clear();
this.txnStatus = TxnStatus.OPEN;
+
+ if (recycleHandle != null) {
+ recycleHandle.recycle(this);
+ }
}
@Override
@@ -140,7 +168,7 @@ class TxnMetaImpl implements TxnMeta {
TxnStatus expectedStatus)
throws InvalidTxnStatusException {
checkTxnStatus(expectedStatus);
- if (!txnStatus.canTransitionTo(newStatus)) {
+ if (!TransactionUtil.canTransitionTo(txnStatus, newStatus)) {
throw new InvalidTxnStatusException(
"Transaction `" + txnID + "` CANNOT transaction from status " + txnStatus + " to " + newStatus);
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java
new file mode 100644
index 0000000..422674b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java
@@ -0,0 +1,1123 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+
+package org.apache.pulsar.transaction.coordinator.proto;
+
+public final class PulsarTransactionMetadata {
+ private PulsarTransactionMetadata() {}
+ public static void registerAllExtensions(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) {
+ }
+ public enum TxnStatus
+ implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite {
+ OPEN(0, 0),
+ COMMITTING(1, 1),
+ COMMITTED(2, 2),
+ ABORTING(3, 3),
+ ABORTED(4, 4),
+ ;
+
+ public static final int OPEN_VALUE = 0;
+ public static final int COMMITTING_VALUE = 1;
+ public static final int COMMITTED_VALUE = 2;
+ public static final int ABORTING_VALUE = 3;
+ public static final int ABORTED_VALUE = 4;
+
+
+ public final int getNumber() { return value; }
+
+ public static TxnStatus valueOf(int value) {
+ switch (value) {
+ case 0: return OPEN;
+ case 1: return COMMITTING;
+ case 2: return COMMITTED;
+ case 3: return ABORTING;
+ case 4: return ABORTED;
+ default: return null;
+ }
+ }
+
+ public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>
+ internalValueMap =
+ new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>() {
+ public TxnStatus findValueByNumber(int number) {
+ return TxnStatus.valueOf(number);
+ }
+ };
+
+ private final int value;
+
+ private TxnStatus(int index, int value) {
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:pulsar.proto.TxnStatus)
+ }
+
+ public interface TransactionMetadataEntryOrBuilder
+ extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+
+ // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+ boolean hasMetadataOp();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp();
+
+ // optional uint64 txnid_least_bits = 2 [default = 0];
+ boolean hasTxnidLeastBits();
+ long getTxnidLeastBits();
+
+ // optional uint64 txnid_most_bits = 3 [default = 0];
+ boolean hasTxnidMostBits();
+ long getTxnidMostBits();
+
+ // optional .pulsar.proto.TxnStatus expected_status = 4;
+ boolean hasExpectedStatus();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus();
+
+ // optional .pulsar.proto.TxnStatus new_status = 5;
+ boolean hasNewStatus();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus();
+
+ // repeated string partitions = 6;
+ java.util.List<String> getPartitionsList();
+ int getPartitionsCount();
+ String getPartitions(int index);
+
+ // repeated .pulsar.proto.Subscription subscriptions = 7;
+ java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription>
+ getSubscriptionsList();
+ org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index);
+ int getSubscriptionsCount();
+
+ // optional uint64 timeout_ms = 8;
+ boolean hasTimeoutMs();
+ long getTimeoutMs();
+
+ // optional uint64 start_time = 9;
+ boolean hasStartTime();
+ long getStartTime();
+
+ // optional uint64 last_modification_time = 10;
+ boolean hasLastModificationTime();
+ long getLastModificationTime();
+ }
+ public static final class TransactionMetadataEntry extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+ implements TransactionMetadataEntryOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage {
+ // Use TransactionMetadataEntry.newBuilder() to construct.
+ private io.netty.util.Recycler.Handle handle;
+ private TransactionMetadataEntry(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ }
+
+ private static final io.netty.util.Recycler<TransactionMetadataEntry> RECYCLER = new io.netty.util.Recycler<TransactionMetadataEntry>() {
+ protected TransactionMetadataEntry newObject(Handle handle) {
+ return new TransactionMetadataEntry(handle);
+ }
+ };
+
+ public void recycle() {
+ this.initFields();
+ this.memoizedIsInitialized = -1;
+ this.bitField0_ = 0;
+ this.memoizedSerializedSize = -1;
+ if (handle != null) { RECYCLER.recycle(this, handle); }
+ }
+
+ private TransactionMetadataEntry(boolean noInit) {}
+
+ private static final TransactionMetadataEntry defaultInstance;
+ public static TransactionMetadataEntry getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TransactionMetadataEntry getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public enum TransactionMetadataOp
+ implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite {
+ NEW(0, 0),
+ ADD_PARTITION(1, 1),
+ ADD_SUBSCRIPTION(2, 2),
+ UPDATE(3, 3),
+ ;
+
+ public static final int NEW_VALUE = 0;
+ public static final int ADD_PARTITION_VALUE = 1;
+ public static final int ADD_SUBSCRIPTION_VALUE = 2;
+ public static final int UPDATE_VALUE = 3;
+
+
+ public final int getNumber() { return value; }
+
+ public static TransactionMetadataOp valueOf(int value) {
+ switch (value) {
+ case 0: return NEW;
+ case 1: return ADD_PARTITION;
+ case 2: return ADD_SUBSCRIPTION;
+ case 3: return UPDATE;
+ default: return null;
+ }
+ }
+
+ public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>
+ internalValueMap =
+ new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>() {
+ public TransactionMetadataOp findValueByNumber(int number) {
+ return TransactionMetadataOp.valueOf(number);
+ }
+ };
+
+ private final int value;
+
+ private TransactionMetadataOp(int index, int value) {
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp)
+ }
+
+ private int bitField0_;
+ // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+ public static final int METADATA_OP_FIELD_NUMBER = 1;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp metadataOp_;
+ public boolean hasMetadataOp() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp() {
+ return metadataOp_;
+ }
+
+ // optional uint64 txnid_least_bits = 2 [default = 0];
+ public static final int TXNID_LEAST_BITS_FIELD_NUMBER = 2;
+ private long txnidLeastBits_;
+ public boolean hasTxnidLeastBits() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getTxnidLeastBits() {
+ return txnidLeastBits_;
+ }
+
+ // optional uint64 txnid_most_bits = 3 [default = 0];
+ public static final int TXNID_MOST_BITS_FIELD_NUMBER = 3;
+ private long txnidMostBits_;
+ public boolean hasTxnidMostBits() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getTxnidMostBits() {
+ return txnidMostBits_;
+ }
+
+ // optional .pulsar.proto.TxnStatus expected_status = 4;
+ public static final int EXPECTED_STATUS_FIELD_NUMBER = 4;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus expectedStatus_;
+ public boolean hasExpectedStatus() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus() {
+ return expectedStatus_;
+ }
+
+ // optional .pulsar.proto.TxnStatus new_status = 5;
+ public static final int NEW_STATUS_FIELD_NUMBER = 5;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus newStatus_;
+ public boolean hasNewStatus() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus() {
+ return newStatus_;
+ }
+
+ // repeated string partitions = 6;
+ public static final int PARTITIONS_FIELD_NUMBER = 6;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringList partitions_;
+ public java.util.List<String>
+ getPartitionsList() {
+ return partitions_;
+ }
+ public int getPartitionsCount() {
+ return partitions_.size();
+ }
+ public String getPartitions(int index) {
+ return partitions_.get(index);
+ }
+
+ // repeated .pulsar.proto.Subscription subscriptions = 7;
+ public static final int SUBSCRIPTIONS_FIELD_NUMBER = 7;
+ private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> subscriptions_;
+ public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> getSubscriptionsList() {
+ return subscriptions_;
+ }
+ public java.util.List<? extends org.apache.pulsar.common.api.proto.PulsarApi.SubscriptionOrBuilder>
+ getSubscriptionsOrBuilderList() {
+ return subscriptions_;
+ }
+ public int getSubscriptionsCount() {
+ return subscriptions_.size();
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index) {
+ return subscriptions_.get(index);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.SubscriptionOrBuilder getSubscriptionsOrBuilder(
+ int index) {
+ return subscriptions_.get(index);
+ }
+
+ // optional uint64 timeout_ms = 8;
+ public static final int TIMEOUT_MS_FIELD_NUMBER = 8;
+ private long timeoutMs_;
+ public boolean hasTimeoutMs() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getTimeoutMs() {
+ return timeoutMs_;
+ }
+
+ // optional uint64 start_time = 9;
+ public static final int START_TIME_FIELD_NUMBER = 9;
+ private long startTime_;
+ public boolean hasStartTime() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getStartTime() {
+ return startTime_;
+ }
+
+ // optional uint64 last_modification_time = 10;
+ public static final int LAST_MODIFICATION_TIME_FIELD_NUMBER = 10;
+ private long lastModificationTime_;
+ public boolean hasLastModificationTime() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ public long getLastModificationTime() {
+ return lastModificationTime_;
+ }
+
+ private void initFields() {
+ metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+ txnidLeastBits_ = 0L;
+ txnidMostBits_ = 0L;
+ expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+ subscriptions_ = java.util.Collections.emptyList();
+ timeoutMs_ = 0L;
+ startTime_ = 0L;
+ lastModificationTime_ = 0L;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ for (int i = 0; i < getSubscriptionsCount(); i++) {
+ if (!getSubscriptions(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+ throws java.io.IOException {
+ throw new RuntimeException("Cannot use CodedOutputStream");
+ }
+
+ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, metadataOp_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeUInt64(2, txnidLeastBits_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeUInt64(3, txnidMostBits_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeEnum(4, expectedStatus_.getNumber());
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeEnum(5, newStatus_.getNumber());
+ }
+ for (int i = 0; i < partitions_.size(); i++) {
+ output.writeBytes(6, partitions_.getByteString(i));
+ }
+ for (int i = 0; i < subscriptions_.size(); i++) {
+ output.writeMessage(7, subscriptions_.get(i));
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeUInt64(8, timeoutMs_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeUInt64(9, startTime_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeUInt64(10, lastModificationTime_);
+ }
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeEnumSize(1, metadataOp_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(2, txnidLeastBits_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(3, txnidMostBits_);
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeEnumSize(4, expectedStatus_.getNumber());
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeEnumSize(5, newStatus_.getNumber());
+ }
+ {
+ int dataSize = 0;
+ for (int i = 0; i < partitions_.size(); i++) {
+ dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSizeNoTag(partitions_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getPartitionsList().size();
+ }
+ for (int i = 0; i < subscriptions_.size(); i++) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(7, subscriptions_.get(i));
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(8, timeoutMs_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(9, startTime_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(10, lastModificationTime_);
+ }
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ throw new RuntimeException("Disabled");
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(byte[] data)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ byte[] data,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseDelimitedFrom(
+ java.io.InputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+ org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ public static final class Builder extends
+ org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry, Builder>
+ implements org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntryOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder {
+ // Construct using org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder()
+ private final io.netty.util.Recycler.Handle handle;
+ private Builder(io.netty.util.Recycler.Handle handle) {
+ this.handle = handle;
+ maybeForceBuilderInitialization();
+ }
+ private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+ protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+ return new Builder(handle);
+ }
+ };
+
+ public void recycle() {
+ clear();
+ if (handle != null) {RECYCLER.recycle(this, handle);}
+ }
+
+ private void maybeForceBuilderInitialization() {
+ }
+ private static Builder create() {
+ return RECYCLER.get();
+ }
+
+ public Builder clear() {
+ super.clear();
+ metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ txnidLeastBits_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ txnidMostBits_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ subscriptions_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+ timeoutMs_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000080);
+ startTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000100);
+ lastModificationTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000200);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry getDefaultInstanceForType() {
+ return org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.getDefaultInstance();
+ }
+
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry build() {
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry buildParsed()
+ throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry buildPartial() {
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.RECYCLER.get();
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.metadataOp_ = metadataOp_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.txnidLeastBits_ = txnidLeastBits_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.txnidMostBits_ = txnidMostBits_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.expectedStatus_ = expectedStatus_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.newStatus_ = newStatus_;
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ partitions_ = new org.apache.pulsar.shaded.com.google.protobuf.v241.UnmodifiableLazyStringList(
+ partitions_);
+ bitField0_ = (bitField0_ & ~0x00000020);
+ }
+ result.partitions_ = partitions_;
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ subscriptions_ = java.util.Collections.unmodifiableList(subscriptions_);
+ bitField0_ = (bitField0_ & ~0x00000040);
+ }
+ result.subscriptions_ = subscriptions_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.timeoutMs_ = timeoutMs_;
+ if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.startTime_ = startTime_;
+ if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.lastModificationTime_ = lastModificationTime_;
+ result.bitField0_ = to_bitField0_;
+ return result;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry other) {
+ if (other == org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.getDefaultInstance()) return this;
+ if (other.hasMetadataOp()) {
+ setMetadataOp(other.getMetadataOp());
+ }
+ if (other.hasTxnidLeastBits()) {
+ setTxnidLeastBits(other.getTxnidLeastBits());
+ }
+ if (other.hasTxnidMostBits()) {
+ setTxnidMostBits(other.getTxnidMostBits());
+ }
+ if (other.hasExpectedStatus()) {
+ setExpectedStatus(other.getExpectedStatus());
+ }
+ if (other.hasNewStatus()) {
+ setNewStatus(other.getNewStatus());
+ }
+ if (!other.partitions_.isEmpty()) {
+ if (partitions_.isEmpty()) {
+ partitions_ = other.partitions_;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ } else {
+ ensurePartitionsIsMutable();
+ partitions_.addAll(other.partitions_);
+ }
+
+ }
+ if (!other.subscriptions_.isEmpty()) {
+ if (subscriptions_.isEmpty()) {
+ subscriptions_ = other.subscriptions_;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ } else {
+ ensureSubscriptionsIsMutable();
+ subscriptions_.addAll(other.subscriptions_);
+ }
+
+ }
+ if (other.hasTimeoutMs()) {
+ setTimeoutMs(other.getTimeoutMs());
+ }
+ if (other.hasStartTime()) {
+ setStartTime(other.getStartTime());
+ }
+ if (other.hasLastModificationTime()) {
+ setLastModificationTime(other.getLastModificationTime());
+ }
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ for (int i = 0; i < getSubscriptionsCount(); i++) {
+ if (!getSubscriptions(i).isInitialized()) {
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ throw new java.io.IOException("Merge from CodedInputStream is disabled");
+ }
+ public Builder mergeFrom(
+ org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+
+ return this;
+ default: {
+ if (!input.skipField(tag)) {
+
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.valueOf(rawValue);
+ if (value != null) {
+ bitField0_ |= 0x00000001;
+ metadataOp_ = value;
+ }
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ txnidLeastBits_ = input.readUInt64();
+ break;
+ }
+ case 24: {
+ bitField0_ |= 0x00000004;
+ txnidMostBits_ = input.readUInt64();
+ break;
+ }
+ case 32: {
+ int rawValue = input.readEnum();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.valueOf(rawValue);
+ if (value != null) {
+ bitField0_ |= 0x00000008;
+ expectedStatus_ = value;
+ }
+ break;
+ }
+ case 40: {
+ int rawValue = input.readEnum();
+ org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.valueOf(rawValue);
+ if (value != null) {
+ bitField0_ |= 0x00000010;
+ newStatus_ = value;
+ }
+ break;
+ }
+ case 50: {
+ ensurePartitionsIsMutable();
+ partitions_.add(input.readBytes());
+ break;
+ }
+ case 58: {
+ org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Subscription.newBuilder();
+ input.readMessage(subBuilder, extensionRegistry);
+ addSubscriptions(subBuilder.buildPartial());
+ break;
+ }
+ case 64: {
+ bitField0_ |= 0x00000080;
+ timeoutMs_ = input.readUInt64();
+ break;
+ }
+ case 72: {
+ bitField0_ |= 0x00000100;
+ startTime_ = input.readUInt64();
+ break;
+ }
+ case 80: {
+ bitField0_ |= 0x00000200;
+ lastModificationTime_ = input.readUInt64();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+ public boolean hasMetadataOp() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp() {
+ return metadataOp_;
+ }
+ public Builder setMetadataOp(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ metadataOp_ = value;
+
+ return this;
+ }
+ public Builder clearMetadataOp() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+
+ return this;
+ }
+
+ // optional uint64 txnid_least_bits = 2 [default = 0];
+ private long txnidLeastBits_ ;
+ public boolean hasTxnidLeastBits() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getTxnidLeastBits() {
+ return txnidLeastBits_;
+ }
+ public Builder setTxnidLeastBits(long value) {
+ bitField0_ |= 0x00000002;
+ txnidLeastBits_ = value;
+
+ return this;
+ }
+ public Builder clearTxnidLeastBits() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ txnidLeastBits_ = 0L;
+
+ return this;
+ }
+
+ // optional uint64 txnid_most_bits = 3 [default = 0];
+ private long txnidMostBits_ ;
+ public boolean hasTxnidMostBits() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public long getTxnidMostBits() {
+ return txnidMostBits_;
+ }
+ public Builder setTxnidMostBits(long value) {
+ bitField0_ |= 0x00000004;
+ txnidMostBits_ = value;
+
+ return this;
+ }
+ public Builder clearTxnidMostBits() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ txnidMostBits_ = 0L;
+
+ return this;
+ }
+
+ // optional .pulsar.proto.TxnStatus expected_status = 4;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ public boolean hasExpectedStatus() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus() {
+ return expectedStatus_;
+ }
+ public Builder setExpectedStatus(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ expectedStatus_ = value;
+
+ return this;
+ }
+ public Builder clearExpectedStatus() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+
+ return this;
+ }
+
+ // optional .pulsar.proto.TxnStatus new_status = 5;
+ private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+ public boolean hasNewStatus() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus() {
+ return newStatus_;
+ }
+ public Builder setNewStatus(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000010;
+ newStatus_ = value;
+
+ return this;
+ }
+ public Builder clearNewStatus() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+
+ return this;
+ }
+
+ // repeated string partitions = 6;
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringList partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+ private void ensurePartitionsIsMutable() {
+ if (!((bitField0_ & 0x00000020) == 0x00000020)) {
+ partitions_ = new org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList(partitions_);
+ bitField0_ |= 0x00000020;
+ }
+ }
+ public java.util.List<String>
+ getPartitionsList() {
+ return java.util.Collections.unmodifiableList(partitions_);
+ }
+ public int getPartitionsCount() {
+ return partitions_.size();
+ }
+ public String getPartitions(int index) {
+ return partitions_.get(index);
+ }
+ public Builder setPartitions(
+ int index, String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePartitionsIsMutable();
+ partitions_.set(index, value);
+
+ return this;
+ }
+ public Builder addPartitions(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensurePartitionsIsMutable();
+ partitions_.add(value);
+
+ return this;
+ }
+ public Builder addAllPartitions(
+ java.lang.Iterable<String> values) {
+ ensurePartitionsIsMutable();
+ super.addAll(values, partitions_);
+
+ return this;
+ }
+ public Builder clearPartitions() {
+ partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000020);
+
+ return this;
+ }
+ void addPartitions(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+ ensurePartitionsIsMutable();
+ partitions_.add(value);
+
+ }
+
+ // repeated .pulsar.proto.Subscription subscriptions = 7;
+ private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> subscriptions_ =
+ java.util.Collections.emptyList();
+ private void ensureSubscriptionsIsMutable() {
+ if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+ subscriptions_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.Subscription>(subscriptions_);
+ bitField0_ |= 0x00000040;
+ }
+ }
+
+ public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> getSubscriptionsList() {
+ return java.util.Collections.unmodifiableList(subscriptions_);
+ }
+ public int getSubscriptionsCount() {
+ return subscriptions_.size();
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index) {
+ return subscriptions_.get(index);
+ }
+ public Builder setSubscriptions(
+ int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSubscriptionsIsMutable();
+ subscriptions_.set(index, value);
+
+ return this;
+ }
+ public Builder setSubscriptions(
+ int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+ ensureSubscriptionsIsMutable();
+ subscriptions_.set(index, builderForValue.build());
+
+ return this;
+ }
+ public Builder addSubscriptions(org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSubscriptionsIsMutable();
+ subscriptions_.add(value);
+
+ return this;
+ }
+ public Builder addSubscriptions(
+ int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureSubscriptionsIsMutable();
+ subscriptions_.add(index, value);
+
+ return this;
+ }
+ public Builder addSubscriptions(
+ org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+ ensureSubscriptionsIsMutable();
+ subscriptions_.add(builderForValue.build());
+
+ return this;
+ }
+ public Builder addSubscriptions(
+ int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+ ensureSubscriptionsIsMutable();
+ subscriptions_.add(index, builderForValue.build());
+
+ return this;
+ }
+ public Builder addAllSubscriptions(
+ java.lang.Iterable<? extends org.apache.pulsar.common.api.proto.PulsarApi.Subscription> values) {
+ ensureSubscriptionsIsMutable();
+ super.addAll(values, subscriptions_);
+
+ return this;
+ }
+ public Builder clearSubscriptions() {
+ subscriptions_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000040);
+
+ return this;
+ }
+ public Builder removeSubscriptions(int index) {
+ ensureSubscriptionsIsMutable();
+ subscriptions_.remove(index);
+
+ return this;
+ }
+
+ // optional uint64 timeout_ms = 8;
+ private long timeoutMs_ ;
+ public boolean hasTimeoutMs() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ public long getTimeoutMs() {
+ return timeoutMs_;
+ }
+ public Builder setTimeoutMs(long value) {
+ bitField0_ |= 0x00000080;
+ timeoutMs_ = value;
+
+ return this;
+ }
+ public Builder clearTimeoutMs() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ timeoutMs_ = 0L;
+
+ return this;
+ }
+
+ // optional uint64 start_time = 9;
+ private long startTime_ ;
+ public boolean hasStartTime() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ public long getStartTime() {
+ return startTime_;
+ }
+ public Builder setStartTime(long value) {
+ bitField0_ |= 0x00000100;
+ startTime_ = value;
+
+ return this;
+ }
+ public Builder clearStartTime() {
+ bitField0_ = (bitField0_ & ~0x00000100);
+ startTime_ = 0L;
+
+ return this;
+ }
+
+ // optional uint64 last_modification_time = 10;
+ private long lastModificationTime_ ;
+ public boolean hasLastModificationTime() {
+ return ((bitField0_ & 0x00000200) == 0x00000200);
+ }
+ public long getLastModificationTime() {
+ return lastModificationTime_;
+ }
+ public Builder setLastModificationTime(long value) {
+ bitField0_ |= 0x00000200;
+ lastModificationTime_ = value;
+
+ return this;
+ }
+ public Builder clearLastModificationTime() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ lastModificationTime_ = 0L;
+
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:pulsar.proto.TransactionMetadataEntry)
+ }
+
+ static {
+ defaultInstance = new TransactionMetadataEntry(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:pulsar.proto.TransactionMetadataEntry)
+ }
+
+
+ static {
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
similarity index 68%
rename from pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
rename to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
index dc5b8d9..ecff44d 100644
--- a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
@@ -16,26 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.transaction.impl.common;
+package org.apache.pulsar.transaction.coordinator.util;
-import com.google.common.annotations.Beta;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.ABORTED;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.ABORTING;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.COMMITTED;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.COMMITTING;
/**
- * A enum represents the status of a transaction.
+ * An transaction util of {@link TransactionUtil}.
*/
-@Beta
-public enum TxnStatus {
-
- // A new transaction is open.
- OPEN,
- // A transaction is in the progress of committing.
- COMMITTING,
- // A transaction is already committed.
- COMMITTED,
- // A transaction is in the progress of aborting.
- ABORTING,
- // A transaction is already aborted.
- ABORTED;
+public class TransactionUtil {
/**
* Check if the a status can be transaction to a new status.
@@ -43,8 +36,7 @@ public enum TxnStatus {
* @param newStatus the new status
* @return true if the current status can be transitioning to.
*/
- public boolean canTransitionTo(TxnStatus newStatus) {
- TxnStatus currentStatus = this;
+ public static boolean canTransitionTo(TxnStatus currentStatus, TxnStatus newStatus) {
switch (currentStatus) {
case OPEN:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
similarity index 51%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
copy to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
index c81b17a..4501e50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
@@ -16,22 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
-
/**
- * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
+ * Implementations of the transaction coordinator.
*/
-public class TransactionStatusException extends TransactionBufferException {
-
- private static final long serialVersionUID = 0L;
-
- public TransactionStatusException(TxnID txnId,
- TxnStatus expectedStatus,
- TxnStatus actualStatus) {
- super("Transaction `" + txnId + "` is not in an expected status `" + expectedStatus
- + "`, but is in status `" + actualStatus + "`");
- }
-}
+package org.apache.pulsar.transaction.coordinator.util;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
new file mode 100644
index 0000000..5391919
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+import "pulsar-common/src/main/proto/PulsarApi.proto";
+package pulsar.proto;
+option java_package = "org.apache.pulsar.transaction.coordinator.proto";
+option optimize_for = LITE_RUNTIME;
+
+enum TxnStatus {
+ OPEN = 0;
+ COMMITTING = 1;
+ COMMITTED = 2;
+ ABORTING = 3;
+ ABORTED = 4;
+}
+
+message TransactionMetadataEntry {
+
+ enum TransactionMetadataOp {
+ NEW = 0;
+ ADD_PARTITION = 1;
+ ADD_SUBSCRIPTION = 2;
+ UPDATE = 3;
+ }
+
+ optional TransactionMetadataOp metadata_op = 1;
+ optional uint64 txnid_least_bits = 2 [default = 0];
+ optional uint64 txnid_most_bits = 3 [default = 0];
+ optional TxnStatus expected_status = 4;
+ optional TxnStatus new_status = 5;
+ repeated string partitions = 6;
+ repeated pulsar.proto.Subscription subscriptions = 7;
+ optional uint64 timeout_ms = 8;
+ optional uint64 start_time = 9;
+ optional uint64 last_modification_time = 10;
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
new file mode 100644
index 0000000..84e68c7
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
+
+ public MLTransactionMetadataStoreTest() {
+ super(3);
+ }
+
+ @Test
+ public void testTransactionOperation() throws Exception {
+ ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+ factoryConf.setMaxCacheSize(0);
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+ TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+ MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+ MLTransactionMetadataStore transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+ int checkReplayRetryCount = 0;
+ while (true) {
+ checkReplayRetryCount++;
+ if (checkReplayRetryCount > 3) {
+ Assert.fail();
+ break;
+ }
+ if (transactionMetadataStore.checkIfReady()) {
+ TxnID txnID = transactionMetadataStore.newTransaction(5000).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.OPEN);
+
+ List<String> partitions = new ArrayList<>();
+ partitions.add("pt-1");
+ partitions.add("pt-2");
+ transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(), partitions);
+
+ partitions.add("pt-3");
+ transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(),
+ partitions);
+
+ List<TransactionSubscription> subscriptions = new ArrayList<>();
+ subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+ subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+ transactionMetadataStore.addAckedPartitionToTxn(txnID, subscriptions).get();
+ Assert.assertTrue(transactionMetadataStore.getTxnMeta(txnID).get().ackedPartitions().containsAll(subscriptions));
+
+ transactionMetadataStore.addAckedPartitionToTxn(txnID, subscriptions).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(),
+ partitions);
+
+ transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.COMMITTING);
+
+ transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+
+ try {
+ transactionMetadataStore.getTxnMeta(txnID).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+ }
+ break;
+ } else {
+ checkReplayRetryCount++;
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ @Test
+ public void testInitTransactionReader() throws Exception {
+ ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+ factoryConf.setMaxCacheSize(0);
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+ TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+ MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+ MLTransactionMetadataStore transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+ int checkReplayRetryCount = 0;
+ while (true) {
+ if (checkReplayRetryCount > 3) {
+ Assert.fail();
+ break;
+ }
+ if (transactionMetadataStore.checkIfReady()) {
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
+
+ List<String> partitions = new ArrayList<>();
+ partitions.add("pt-1");
+ partitions.add("pt-2");
+ transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions).get();
+ transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions).get();
+
+ List<TransactionSubscription> subscriptions = new ArrayList<>();
+ subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+ subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+
+ transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
+ transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+ List<TransactionSubscription> subscriptions1 = new ArrayList<>();
+ subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
+ subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+ subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+ transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
+ transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
+
+ transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+ transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+ transactionMetadataStore.closeAsync();
+
+ MLTransactionMetadataStore transactionMetadataStoreTest =
+ new MLTransactionMetadataStore(transactionCoordinatorID,
+
+ new MLTransactionLogImpl(transactionCoordinatorID, factory));
+
+ while (true) {
+ if (checkReplayRetryCount > 6) {
+ Assert.fail();
+ break;
+ }
+ if (transactionMetadataStoreTest.checkIfReady()) {
+ subscriptions.add(new TransactionSubscription("topic3", "sub3"));
+ TxnMeta txnMeta1 = transactionMetadataStoreTest.getTxnMeta(txnID1).get();
+ TxnMeta txnMeta2 = transactionMetadataStoreTest.getTxnMeta(txnID2).get();
+ Assert.assertEquals(txnMeta1.producedPartitions(), partitions);
+ Assert.assertEquals(txnMeta2.producedPartitions(), partitions);
+ Assert.assertEquals(txnMeta1.ackedPartitions().size(), subscriptions.size());
+ Assert.assertEquals(txnMeta2.ackedPartitions().size(), subscriptions.size());
+ Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions()));
+ Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions()));
+ Assert.assertEquals(txnMeta1.status(), TxnStatus.COMMITTING);
+ Assert.assertEquals(txnMeta2.status(), TxnStatus.COMMITTING);
+ transactionMetadataStoreTest
+ .updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+ transactionMetadataStoreTest
+ .updateTxnStatus(txnID2, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+ try {
+ transactionMetadataStoreTest.getTxnMeta(txnID1).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+ }
+
+ try {
+ transactionMetadataStoreTest.getTxnMeta(txnID2).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+ }
+ TxnID txnID = transactionMetadataStoreTest.newTransaction(1000).get();
+ Assert.assertEquals(txnID.getLeastSigBits(), 2L);
+ break;
+ } else {
+ checkReplayRetryCount++;
+ Thread.sleep(100);
+ }
+ }
+ break;
+ } else {
+ checkReplayRetryCount++;
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ @Test
+ public void testDeleteLog() throws Exception {
+ ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+ factoryConf.setMaxCacheSize(0);
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+ TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+ MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+ MLTransactionMetadataStore transactionMetadataStore =
+ new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+ int checkReplayRetryCount = 0;
+ while (true) {
+ if (checkReplayRetryCount > 3) {
+ Assert.fail();
+ break;
+ }
+ if (transactionMetadataStore.checkIfReady()) {
+ TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+ TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
+ Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
+
+ List<String> partitions = new ArrayList<>();
+ partitions.add("pt-1");
+ partitions.add("pt-2");
+ transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions).get();
+ transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions).get();
+
+ List<TransactionSubscription> subscriptions = new ArrayList<>();
+ subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+ subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+
+ transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
+ transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+ List<TransactionSubscription> subscriptions1 = new ArrayList<>();
+ subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
+ subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+ subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+ transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
+ transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
+
+ transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+ transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN).get();
+
+ transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+ transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING).get();
+ Field field = mlTransactionLog.getClass().getDeclaredField("cursor");
+ field.setAccessible(true);
+ ManagedCursor cursor = (ManagedCursor) field.get(mlTransactionLog);
+ Assert.assertEquals(cursor.getMarkDeletedPosition(), cursor.getManagedLedger().getLastConfirmedEntry());
+
+ break;
+ } else {
+ checkReplayRetryCount++;
+ Thread.sleep(100);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index ff4bd31..9a79b37 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -29,8 +29,8 @@ import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
@@ -78,14 +78,14 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testGetTxnStatusSuccess() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
}
@Test
public void testUpdateTxnStatusSuccess() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -99,7 +99,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -118,7 +118,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testUpdateTxnStatusCannotTransition() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -137,7 +137,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testAddProducedPartition() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
@@ -191,7 +191,7 @@ public class TransactionMetadataStoreProviderTest {
@Test
public void testAddAckedPartition() throws Exception {
- TxnID txnID = this.store.newTransaction().get();
+ TxnID txnID = this.store.newTransaction(0L).get();
TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
assertEquals(txnStatus, TxnStatus.OPEN);
diff --git a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
similarity index 91%
rename from pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
rename to pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
index 7a8c8b9..5f606dc 100644
--- a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.transaction.impl.common;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+package org.apache.pulsar.transaction.coordinator;
import com.google.common.collect.Sets;
-import java.util.Set;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.util.Set;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
/**
* Unit test {@link TxnStatus}.
*/
@@ -103,13 +106,13 @@ public class TxnStatusTest {
Set<TxnStatus> statusesCanNotTransactionTo) {
statusesCanTransitionTo.forEach(newStatus -> {
assertTrue(
- status.canTransitionTo(newStatus),
+ TransactionUtil.canTransitionTo(status, newStatus),
"Status `" + status + "` should be able to transition to `" + newStatus + "`"
);
});
statusesCanNotTransactionTo.forEach(newStatus -> {
assertFalse(
- status.canTransitionTo(newStatus),
+ TransactionUtil.canTransitionTo(status, newStatus),
"Status `" + status + "` should NOT be able to transition to `" + newStatus + "`"
);
});
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
new file mode 100644
index 0000000..c6df6d5
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.test;
+
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class MockedBookKeeperTestCase {
+
+ static final Logger LOG = LoggerFactory.getLogger(MockedBookKeeperTestCase.class);
+
+ // ZooKeeper related variables
+ protected MockZooKeeper zkc;
+
+ // BookKeeper related variables
+ protected PulsarMockBookKeeper bkc;
+ protected int numBookies;
+
+ protected ManagedLedgerFactoryImpl factory;
+
+ protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+ protected OrderedScheduler executor;
+ protected ExecutorService cachedExecutor;
+
+ public MockedBookKeeperTestCase() {
+ // By default start a 3 bookies cluster
+ this(3);
+ }
+
+ public MockedBookKeeperTestCase(int numBookies) {
+ this.numBookies = numBookies;
+ }
+
+ @BeforeMethod
+ public void setUp(Method method) throws Exception {
+ LOG.info(">>>>>> starting {}", method);
+ try {
+ // start bookkeeper service
+ startBookKeeper();
+ } catch (Exception e) {
+ LOG.error("Error setting up", e);
+ throw e;
+ }
+
+ ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+ factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+ zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(Method method) {
+ try {
+ LOG.info("@@@@@@@@@ stopping " + method);
+ factory.shutdown();
+ factory = null;
+ stopBookKeeper();
+ stopZooKeeper();
+ LOG.info("--------- stopped {}", method);
+ } catch (Exception e) {
+ LOG.error("tearDown Error", e);
+ }
+ }
+
+ @BeforeClass
+ public void setUpClass() {
+ executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
+ cachedExecutor = Executors.newCachedThreadPool();
+ }
+
+ @AfterClass
+ public void tearDownClass() {
+ executor.shutdown();
+ cachedExecutor.shutdown();
+ }
+
+ /**
+ * Start cluster
+ *
+ * @throws Exception
+ */
+ protected void startBookKeeper() throws Exception {
+ zkc = MockZooKeeper.newInstance();
+ for (int i = 0; i < numBookies; i++) {
+ ZkUtils.createFullPathOptimistic(zkc, "/ledgers/available/192.168.1.1:" + (5000 + i), "".getBytes(), null,
+ null);
+ }
+
+ zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
+
+ bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
+ }
+
+ protected void stopBookKeeper() throws Exception {
+ bkc.shutdown();
+ }
+
+ protected void stopZooKeeper() throws Exception {
+ zkc.shutdown();
+ }
+
+}
\ No newline at end of file