You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/07 05:03:17 UTC
[pulsar] branch master updated: [transaction][coordinator] add the
interfaces for transaction metadata store and an in-memory implementation
(#4161)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 277bcee [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation (#4161)
277bcee is described below
commit 277bceef0366a2ef17ff6e30f62f77af0872fbca
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 7 13:03:12 2019 +0800
[transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation (#4161)
* [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation
Master Issue: #2664
*Motivation*
Setup the project structure for developing transaction coordinator.
*Changes*
- setup the project structure
- introduce common classes: `TxnID`, `TxnStatus`
- introduce `TransactionMetadataStore` for TC to store and manage the transaction status
- enable checkstyle for `pulsar-transaction`
*NOTES*
All the changes are isolated into `pulsar-transaction` module. Hence it will not block any other changes or releases.
---
pom.xml | 3 +
pulsar-transaction/common/pom.xml | 72 ++++++
.../pulsar/transaction/impl/common/TxnID.java | 48 ++++
.../pulsar/transaction/impl/common/TxnStatus.java | 65 ++++++
.../transaction/impl/common/package-info.java | 22 ++
.../transaction/impl/common/TxnStatusTest.java | 118 ++++++++++
pulsar-transaction/coordinator/pom.xml | 46 ++++
.../coordinator/TransactionCoordinatorID.java | 36 +++
.../coordinator/TransactionMetadataStore.java | 96 ++++++++
.../TransactionMetadataStoreProvider.java | 64 ++++++
.../pulsar/transaction/coordinator/TxnMeta.java | 95 ++++++++
.../exceptions/CoordinatorException.java | 38 ++++
.../exceptions/InvalidTxnStatusException.java | 44 ++++
.../exceptions/TransactionNotFoundException.java | 39 ++++
.../coordinator/exceptions/package-info.java | 22 ++
.../impl/InMemTransactionMetadataStore.java | 114 ++++++++++
.../InMemTransactionMetadataStoreProvider.java | 38 ++++
.../transaction/coordinator/impl/TxnMetaImpl.java | 150 +++++++++++++
.../transaction/coordinator/impl/package-info.java | 22 ++
.../transaction/coordinator/package-info.java | 22 ++
.../TransactionMetadataStoreProviderTest.java | 245 +++++++++++++++++++++
pulsar-transaction/pom.xml | 62 ++++++
22 files changed, 1461 insertions(+)
diff --git a/pom.xml b/pom.xml
index e0f35e7..b116b03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,9 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
+ <!-- transaction related modules -->
+ <module>pulsar-transaction</module>
+
<!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
<module>jclouds-shaded</module>
diff --git a/pulsar-transaction/common/pom.xml b/pulsar-transaction/common/pom.xml
new file mode 100644
index 0000000..b4fa95d
--- /dev/null
+++ b/pulsar-transaction/common/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-transaction-parent</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-transaction-common</artifactId>
+ <name>Pulsar Transaction :: Common </name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf3.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf3.version}</version>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
+ <checkStaleness>true</checkStaleness>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
new file mode 100644
index 0000000..16ce4e4
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import com.google.common.annotations.Beta;
+import java.io.Serializable;
+import lombok.Data;
+
+/**
+ * An identifier for representing a transaction.
+ */
+@Beta
+@Data
+public class TxnID implements Serializable {
+
+ private static final long serialVersionUID = 0L;
+
+ /*
+ * The most significant 64 bits of this TxnID.
+ *
+ * @serial
+ */
+ private final long mostSigBits;
+
+ /*
+ * The least significant 64 bits of this TxnID.
+ *
+ * @serial
+ */
+ private final long leastSigBits;
+
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
new file mode 100644
index 0000000..dc5b8d9
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * A enum represents the status of a transaction.
+ */
+@Beta
+public enum TxnStatus {
+
+ // A new transaction is open.
+ OPEN,
+ // A transaction is in the progress of committing.
+ COMMITTING,
+ // A transaction is already committed.
+ COMMITTED,
+ // A transaction is in the progress of aborting.
+ ABORTING,
+ // A transaction is already aborted.
+ ABORTED;
+
+ /**
+ * Check if the a status can be transaction to a new status.
+ *
+ * @param newStatus the new status
+ * @return true if the current status can be transitioning to.
+ */
+ public boolean canTransitionTo(TxnStatus newStatus) {
+ TxnStatus currentStatus = this;
+
+ switch (currentStatus) {
+ case OPEN:
+ return newStatus != COMMITTED && newStatus != ABORTED;
+ case COMMITTING:
+ return newStatus == COMMITTING || newStatus == COMMITTED;
+ case COMMITTED:
+ return newStatus == COMMITTED;
+ case ABORTING:
+ return newStatus == ABORTING || newStatus == ABORTED;
+ case ABORTED:
+ return newStatus == ABORTED;
+ default:
+ throw new IllegalArgumentException("Unknown txn status : " + newStatus);
+ }
+ }
+
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java
new file mode 100644
index 0000000..caa4ce9
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Common classes used by pulsar transaction related modules.
+ */
+package org.apache.pulsar.transaction.impl.common;
\ No newline at end of file
diff --git a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java b/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
new file mode 100644
index 0000000..7a8c8b9
--- /dev/null
+++ b/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link TxnStatus}.
+ */
+public class TxnStatusTest {
+
+ @DataProvider(name = "statuses")
+ public static Object[][] statuses() {
+ return new Object[][] {
+ {
+ TxnStatus.OPEN,
+ Sets.newHashSet(
+ TxnStatus.OPEN,
+ TxnStatus.COMMITTING,
+ TxnStatus.ABORTING
+ ),
+ Sets.newHashSet(
+ TxnStatus.COMMITTED,
+ TxnStatus.ABORTED
+ )
+ },
+ {
+ TxnStatus.COMMITTING,
+ Sets.newHashSet(
+ TxnStatus.COMMITTING,
+ TxnStatus.COMMITTED
+ ),
+ Sets.newHashSet(
+ TxnStatus.OPEN,
+ TxnStatus.ABORTING,
+ TxnStatus.ABORTED
+ )
+ },
+ {
+ TxnStatus.COMMITTED,
+ Sets.newHashSet(
+ TxnStatus.COMMITTED
+ ),
+ Sets.newHashSet(
+ TxnStatus.OPEN,
+ TxnStatus.COMMITTING,
+ TxnStatus.ABORTING,
+ TxnStatus.ABORTED
+ )
+ },
+ {
+ TxnStatus.ABORTING,
+ Sets.newHashSet(
+ TxnStatus.ABORTING,
+ TxnStatus.ABORTED
+ ),
+ Sets.newHashSet(
+ TxnStatus.OPEN,
+ TxnStatus.COMMITTING,
+ TxnStatus.COMMITTED
+ )
+ },
+ {
+ TxnStatus.ABORTED,
+ Sets.newHashSet(
+ TxnStatus.ABORTED
+ ),
+ Sets.newHashSet(
+ TxnStatus.OPEN,
+ TxnStatus.COMMITTING,
+ TxnStatus.COMMITTED,
+ TxnStatus.ABORTING
+ )
+ },
+ };
+ }
+
+ @Test(dataProvider = "statuses")
+ public void testTxnStatusTransition(TxnStatus status,
+ Set<TxnStatus> statusesCanTransitionTo,
+ Set<TxnStatus> statusesCanNotTransactionTo) {
+ statusesCanTransitionTo.forEach(newStatus -> {
+ assertTrue(
+ status.canTransitionTo(newStatus),
+ "Status `" + status + "` should be able to transition to `" + newStatus + "`"
+ );
+ });
+ statusesCanNotTransactionTo.forEach(newStatus -> {
+ assertFalse(
+ status.canTransitionTo(newStatus),
+ "Status `" + status + "` should NOT be able to transition to `" + newStatus + "`"
+ );
+ });
+ }
+
+}
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
new file mode 100644
index 0000000..a95eeb8
--- /dev/null
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-transaction-parent</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-transaction-coordinator</artifactId>
+ <name>Pulsar Transaction :: Coordinator</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-transaction-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java
new file mode 100644
index 0000000..3e249b3
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import lombok.Data;
+
+/**
+ * A class for representing the identifier for a transaction coordinator.
+ */
+@Beta
+@Data
+public class TransactionCoordinatorID {
+
+ /**
+ * The identifier of the transaction coordinator.
+ */
+ private final long id;
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
new file mode 100644
index 0000000..e3a9d7d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A store for storing all the transaction metadata.
+ */
+@Beta
+public interface TransactionMetadataStore {
+
+ /**
+ * Query the {@link TxnStatus} of a given transaction <tt>txnid</tt>.
+ *
+ * @param txnid transaction id
+ * @return a future represents the result of this operation.
+ * it returns {@link TxnStatus} of the given transaction.
+ */
+ default CompletableFuture<TxnStatus> getTxnStatus(TxnID txnid) {
+ return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status());
+ }
+
+ /**
+ * Query the {@link TxnMeta} of a given transaction <tt>txnid</tt>.
+ *
+ * @param txnid transaction id
+ * @return a future represents the result of this operation.
+ * it returns {@link TxnMeta} of the given transaction.
+ */
+ CompletableFuture<TxnMeta> getTxnMeta(TxnID txnid);
+
+ /**
+ * Create a new transaction in the transaction metadata store.
+ *
+ * @return a future represents the result of creating a new transaction.
+ * it returns {@link TxnID} as the identifier for identifying the
+ * transaction.
+ */
+ CompletableFuture<TxnID> newTransaction();
+
+ /**
+ * Add the produced partitions to transaction identified by <tt>txnid</tt>.
+ *
+ * @param txnid transaction id
+ * @param partitions the list of partitions that a transaction produces to
+ * @return a future represents the result of this operation
+ */
+ CompletableFuture<Void> addProducedPartitionToTxn(
+ TxnID txnid, List<String> partitions);
+
+ /**
+ * Add the acked partitions to transaction identified by <tt>txnid</tt>.
+ *
+ * @param txnid transaction id
+ * @param partitions the list of partitions that a transaction acknowledge to
+ * @return a future represents the result of the operation
+ */
+ CompletableFuture<Void> addAckedPartitionToTxn(
+ TxnID txnid, List<String> partitions);
+
+ /**
+ * Update the transaction from <tt>expectedStatus</tt> to <tt>newStatus</tt>.
+ *
+ * <p>If the current transaction status is not <tt>expectedStatus</tt>, the
+ * update will be failed.
+ *
+ * @param newStatus the new txn status that the transaction should be updated to
+ * @param expectedStatus the expected status that the transaction should be
+ * @return a future represents the result of the operation
+ */
+ CompletableFuture<Void> updateTxnStatus(
+ TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus);
+
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..46c66bc
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.Beta;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A provider that provides {@link TransactionMetadataStore}.
+ */
+@Beta
+public interface TransactionMetadataStoreProvider {
+
+ /**
+ * Construct a provider from the provided class.
+ *
+ * @param providerClassName the provider class name.
+ * @return an instance of transaction metadata store provider.
+ */
+ static TransactionMetadataStoreProvider newProvider(String providerClassName) throws IOException {
+ Class<?> providerClass;
+ try {
+ providerClass = Class.forName(providerClassName);
+ Object obj = providerClass.newInstance();
+ checkArgument(obj instanceof TransactionMetadataStoreProvider,
+ "The factory has to be an instance of "
+ + TransactionMetadataStoreProvider.class.getName());
+
+ return (TransactionMetadataStoreProvider) obj;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Open the transaction metadata store for transaction coordinator
+ * identified by <tt>transactionCoordinatorId</tt>.
+ *
+ * @return a future represents the result of the operation.
+ * an instance of {@link TransactionMetadataStore} is returned
+ * if the operation succeeds.
+ */
+ CompletableFuture<TransactionMetadataStore> openStore(
+ TransactionCoordinatorID transactionCoordinatorId);
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
new file mode 100644
index 0000000..39d92aa
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * An interface represents the metadata of a transaction in {@link TransactionMetadataStore}.
+ */
+@Beta
+public interface TxnMeta {
+
+ /**
+ * Return the transaction id.
+ *
+ * @return transaction id.
+ */
+ TxnID id();
+
+ /**
+ * Return the transaction status.
+ *
+ * @return transaction status.
+ */
+ TxnStatus status();
+
+ /**
+ * Return the the list of partitions that this transaction produces to.
+ *
+ * @return the list of partitions that this transaction produced to.
+ * the returned list is sorted by partition name.
+ */
+ List<String> producedPartitions();
+
+ /**
+ * Return the the list of partitions that this transaction acknowledges to.
+ *
+ * @return the list of partitions that this transaction acknowledges to.
+ * the returned list is sorted by partition name.
+ */
+ List<String> ackedPartitions();
+
+ /**
+ * Add the list of produced partitions to the transaction.
+ *
+ * @return transaction meta
+ * @throws InvalidTxnStatusException if the transaction is not in
+ * {@link TxnStatus#OPEN}
+ */
+ TxnMeta addProducedPartitions(List<String> partitions)
+ throws InvalidTxnStatusException;
+
+ /**
+ * Add the list of acked partitions to the transaction.
+ *
+ * @return transaction meta
+ * @throws InvalidTxnStatusException if the transaction is not in
+ * {@link TxnStatus#OPEN}
+ */
+ TxnMeta addAckedPartitions(List<String> partitions)
+ throws InvalidTxnStatusException;
+
+ /**
+ * Update the transaction stats from the <tt>newStatus</tt> only when
+ * the current status is the expected <tt>expectedStatus</tt>.
+ *
+ * @param newStatus the new transaction status
+ * @param expectedStatus the expected transaction status
+ * @return the transaction itself.
+ * @throws InvalidTxnStatusException if the transaction is not in the expected
+ * status, or it can not be transitioned to the new status
+ */
+ TxnMeta updateTxnStatus(TxnStatus newStatus,
+ TxnStatus expectedStatus) throws InvalidTxnStatusException;
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
new file mode 100644
index 0000000..3863b2f
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+/**
+ * The base exception for exceptions thrown from coordinator.
+ */
+public abstract class CoordinatorException extends Exception {
+ private static final long serialVersionUID = 0L;
+
+ public CoordinatorException(String message) {
+ super(message);
+ }
+
+ public CoordinatorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CoordinatorException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java
new file mode 100644
index 0000000..489b550
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * Exception is thrown when transaction is not in the right status.
+ */
+public class InvalidTxnStatusException extends CoordinatorException {
+
+ private static final long serialVersionUID = 0L;
+
+ public InvalidTxnStatusException(String message) {
+ super(message);
+ }
+
+ public InvalidTxnStatusException(TxnID txnID,
+ TxnStatus expectedStatus,
+ TxnStatus actualStatus) {
+ super(
+ "Expect Txn `" + txnID + "` to be in " + expectedStatus
+ + " status but it is in " + actualStatus + " status");
+
+ }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java
new file mode 100644
index 0000000..86f0321
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+/**
+ * Exception is thrown when a transaction is not found in coordinator.
+ */
+public class TransactionNotFoundException extends CoordinatorException {
+
+ private static final long serialVersionUID = 0L;
+
+ public TransactionNotFoundException(String message) {
+ super(message);
+ }
+
+ public TransactionNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TransactionNotFoundException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java
new file mode 100644
index 0000000..7ed25d4
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Internal exceptions for transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
new file mode 100644
index 0000000..260935a
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * An in-memory implementation of {@link TransactionMetadataStore}.
+ */
+class InMemTransactionMetadataStore implements TransactionMetadataStore {
+
+ private final TransactionCoordinatorID tcID;
+ private final AtomicLong localID;
+ private final ConcurrentMap<TxnID, TxnMetaImpl> transactions;
+
+ InMemTransactionMetadataStore(TransactionCoordinatorID tcID) {
+ this.tcID = tcID;
+ this.localID = new AtomicLong(0L);
+ this.transactions = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnid) {
+ CompletableFuture<TxnMeta> getFuture = new CompletableFuture<>();
+ TxnMetaImpl txn = transactions.get(txnid);
+ if (null == txn) {
+ getFuture.completeExceptionally(
+ new TransactionNotFoundException("Transaction not found :" + txnid));
+ } else {
+ getFuture.complete(txn);
+ }
+ return getFuture;
+ }
+
+ @Override
+ public CompletableFuture<TxnID> newTransaction() {
+ TxnID txnID = new TxnID(
+ tcID.getId(),
+ localID.getAndIncrement()
+ );
+ TxnMetaImpl txn = new TxnMetaImpl(txnID);
+ transactions.put(txnID, txn);
+ return CompletableFuture.completedFuture(txnID);
+ }
+
+ @Override
+ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnid, List<String> partitions) {
+ return getTxnMeta(txnid).thenCompose(txn -> {
+ try {
+ txn.addProducedPartitions(partitions);
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ CompletableFuture<Void> error = new CompletableFuture<>();
+ error.completeExceptionally(e);
+ return error;
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<String> partitions) {
+ return getTxnMeta(txnid).thenCompose(txn -> {
+ try {
+ txn.addAckedPartitions(partitions);
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ CompletableFuture<Void> error = new CompletableFuture<>();
+ error.completeExceptionally(e);
+ return error;
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> updateTxnStatus(TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus) {
+ return getTxnMeta(txnid).thenCompose(txn -> {
+ try {
+ txn.updateTxnStatus(newStatus, expectedStatus);
+ return CompletableFuture.completedFuture(null);
+ } catch (InvalidTxnStatusException e) {
+ CompletableFuture<Void> error = new CompletableFuture<>();
+ error.completeExceptionally(e);
+ return error;
+ }
+ });
+ }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..127106d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+
+/**
+ * The provider that offers in-memory implementation of {@link TransactionMetadataStore}.
+ */
+public class InMemTransactionMetadataStoreProvider implements TransactionMetadataStoreProvider {
+
+ @Override
+ public CompletableFuture<TransactionMetadataStore>
+ openStore(TransactionCoordinatorID transactionCoordinatorId) {
+ return CompletableFuture.completedFuture(
+ new InMemTransactionMetadataStore(transactionCoordinatorId));
+ }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
new file mode 100644
index 0000000..f05af362
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A class represents the metadata of a transaction stored in
+ * the {@link org.apache.pulsar.transaction.coordinator.TransactionMetadataStore}.
+ */
+class TxnMetaImpl implements TxnMeta {
+
+ private final TxnID txnID;
+ private final Set<String> producedPartitions = new HashSet<>();
+ private final Set<String> ackedPartitions = new HashSet<>();
+ private TxnStatus txnStatus;
+
+ TxnMetaImpl(TxnID txnID) {
+ this.txnID = txnID;
+ this.txnStatus = TxnStatus.OPEN;
+ }
+
+ @Override
+ public TxnID id() {
+ return txnID;
+ }
+
+ /**
+ * Return the current status of the transaction.
+ *
+ * @return current status of the transaction.
+ */
+ @Override
+ public synchronized TxnStatus status() {
+ return txnStatus;
+ }
+
+ @Override
+ public List<String> producedPartitions() {
+ List<String> returnedPartitions;
+ synchronized (this) {
+ returnedPartitions = new ArrayList<>(producedPartitions.size());
+ returnedPartitions.addAll(producedPartitions);
+ }
+ Collections.sort(returnedPartitions);
+ return returnedPartitions;
+ }
+
+ @Override
+ public List<String> ackedPartitions() {
+ List<String> returnedPartitions;
+ synchronized (this) {
+ returnedPartitions = new ArrayList<>(ackedPartitions.size());
+ returnedPartitions.addAll(ackedPartitions);
+ }
+ Collections.sort(returnedPartitions);
+ return returnedPartitions;
+ }
+
+ /**
+ * Check if the transaction is in an expected status.
+ *
+ * @param expectedStatus
+ */
+ private synchronized void checkTxnStatus(TxnStatus expectedStatus) throws InvalidTxnStatusException {
+ if (this.txnStatus != expectedStatus) {
+ throw new InvalidTxnStatusException(
+ txnID, expectedStatus, txnStatus
+ );
+ }
+ }
+
+ /**
+ * Add the list partitions that the transaction produces to.
+ *
+ * @param partitions the list of partitions that the txn produces to
+ * @return the transaction itself.
+ * @throws CoordinatorException
+ */
+ @Override
+ public synchronized TxnMetaImpl addProducedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+ checkTxnStatus(TxnStatus.OPEN);
+
+ this.producedPartitions.addAll(partitions);
+ return this;
+ }
+
+ /**
+ * Remove the list partitions that the transaction acknowledges to.
+ *
+ * @param partitions the list of partitions that the txn acknowledges to
+ * @return the transaction itself.
+ * @throws CoordinatorException
+ */
+ @Override
+ public synchronized TxnMetaImpl addAckedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+ checkTxnStatus(TxnStatus.OPEN);
+
+ this.ackedPartitions.addAll(partitions);
+ return this;
+ }
+
+ /**
+ * Update the transaction stats from the <tt>newStatus</tt> only when
+ * the current status is the expected <tt>expectedStatus</tt>.
+ *
+ * @param newStatus the new transaction status
+ * @param expectedStatus the expected transaction status
+ * @return the transaction itself.
+ * @throws InvalidTxnStatusException
+ */
+ @Override
+ public synchronized TxnMetaImpl updateTxnStatus(TxnStatus newStatus,
+ TxnStatus expectedStatus)
+ throws InvalidTxnStatusException {
+ checkTxnStatus(expectedStatus);
+ if (!txnStatus.canTransitionTo(newStatus)) {
+ throw new InvalidTxnStatusException(
+ "Transaction `" + txnID + "` CANNOT transaction from status " + txnStatus + " to " + newStatus);
+ }
+ this.txnStatus = newStatus;
+ return this;
+ }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java
new file mode 100644
index 0000000..59ce99b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementations of the transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java
new file mode 100644
index 0000000..4416ff9
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes for implementing pulsar transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
new file mode 100644
index 0000000..4037f9d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test different transaction metadata store provider.
+ */
+public class TransactionMetadataStoreProviderTest {
+
+ @DataProvider(name = "providers")
+ public static Object[][] providers() {
+ return new Object[][] {
+ { InMemTransactionMetadataStoreProvider.class.getName() }
+ };
+ }
+
+ private final String providerClassName;
+ private TransactionMetadataStoreProvider provider;
+ private TransactionCoordinatorID tcId;
+ private TransactionMetadataStore store;
+
+ @Factory(dataProvider = "providers")
+ public TransactionMetadataStoreProviderTest(String providerClassName) throws Exception {
+ this.providerClassName = providerClassName;
+ this.provider = TransactionMetadataStoreProvider.newProvider(providerClassName);
+ }
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ this.tcId = new TransactionCoordinatorID(1L);
+ this.store = this.provider.openStore(tcId).get();
+ }
+
+ @Test
+ public void testGetTxnStatusNotFound() throws Exception {
+ try {
+ this.store.getTxnStatus(
+ new TxnID(tcId.getId(), 12345L)).get();
+ fail("Should fail to get txn status of a non-existent transaction");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+ }
+ }
+
+ @Test
+ public void testGetTxnStatusSuccess() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+ }
+
+ @Test
+ public void testUpdateTxnStatusSuccess() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+
+ // update the status
+ this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+ // get the new status
+ TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(newTxnStatus, TxnStatus.COMMITTING);
+ }
+
+ @Test
+ public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+
+ // update the status
+ try {
+ this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.COMMITTING).get();
+ fail("Should fail to update txn status if it is not in expected status");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+ }
+
+ // get the txn status, it should be changed.
+ TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(newTxnStatus, TxnStatus.OPEN);
+ }
+
+ @Test
+ public void testUpdateTxnStatusCannotTransition() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+
+ // update the status
+ try {
+ this.store.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.OPEN).get();
+ fail("Should fail to update txn status if it can not transition to the new status");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+ }
+
+ // get the txn status, it should be changed.
+ TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(newTxnStatus, TxnStatus.OPEN);
+ }
+
+ @Test
+ public void testAddProducedPartition() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+
+ List<String> partitions = new ArrayList<>();
+ partitions.add("ptn-0");
+ partitions.add("ptn-1");
+ partitions.add("ptn-2");
+
+ // add the list of partitions to the transaction
+ this.store.addProducedPartitionToTxn(txnID, partitions).get();
+
+ TxnMeta txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.OPEN);
+ assertEquals(txn.producedPartitions(), partitions);
+
+ // add another list of partition. duplicated partitions should be removed
+ List<String> newPartitions = new ArrayList<>();
+ newPartitions.add("ptn-2");
+ newPartitions.add("ptn-3");
+ newPartitions.add("ptn-4");
+ this.store.addProducedPartitionToTxn(txnID, newPartitions);
+
+ txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.OPEN);
+ List<String> finalPartitions = new ArrayList<>();
+ finalPartitions.add("ptn-0");
+ finalPartitions.add("ptn-1");
+ finalPartitions.add("ptn-2");
+ finalPartitions.add("ptn-3");
+ finalPartitions.add("ptn-4");
+ assertEquals(txn.producedPartitions(), finalPartitions);
+
+ // change the transaction to `COMMITTING`
+ this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+ // add partitions should fail if it is already committing.
+ List<String> newPartitions2 = new ArrayList<>();
+ newPartitions2.add("ptn-5");
+ newPartitions2.add("ptn-6");
+ try {
+ this.store.addProducedPartitionToTxn(txnID, newPartitions2).get();
+ fail("Should fail to add produced partitions if the transaction is not in OPEN status");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+ }
+
+ txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.COMMITTING);
+ assertEquals(txn.producedPartitions(), finalPartitions);
+ }
+
+ @Test
+ public void testAddAckedPartition() throws Exception {
+ TxnID txnID = this.store.newTransaction().get();
+ TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+ assertEquals(txnStatus, TxnStatus.OPEN);
+
+ List<String> partitions = new ArrayList<>();
+ partitions.add("ptn-0");
+ partitions.add("ptn-1");
+ partitions.add("ptn-2");
+
+ // add the list of partitions to the transaction
+ this.store.addAckedPartitionToTxn(txnID, partitions).get();
+
+ TxnMeta txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.OPEN);
+ assertEquals(txn.ackedPartitions(), partitions);
+
+ // add another list of partition. duplicated partitions should be removed
+ List<String> newPartitions = new ArrayList<>();
+ newPartitions.add("ptn-2");
+ newPartitions.add("ptn-3");
+ newPartitions.add("ptn-4");
+ this.store.addAckedPartitionToTxn(txnID, newPartitions);
+
+ txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.OPEN);
+ List<String> finalPartitions = new ArrayList<>();
+ finalPartitions.add("ptn-0");
+ finalPartitions.add("ptn-1");
+ finalPartitions.add("ptn-2");
+ finalPartitions.add("ptn-3");
+ finalPartitions.add("ptn-4");
+ assertEquals(txn.ackedPartitions(), finalPartitions);
+
+ // change the transaction to `COMMITTING`
+ this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+ // add partitions should fail if it is already committing.
+ List<String> newPartitions2 = new ArrayList<>();
+ newPartitions2.add("ptn-5");
+ newPartitions2.add("ptn-6");
+ try {
+ this.store.addAckedPartitionToTxn(txnID, newPartitions2).get();
+ fail("Should fail to add acked partitions if the transaction is not in OPEN status");
+ } catch (ExecutionException ee) {
+ assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+ }
+
+ txn = this.store.getTxnMeta(txnID).get();
+ assertEquals(txn.status(), TxnStatus.COMMITTING);
+ assertEquals(txn.ackedPartitions(), finalPartitions);
+ }
+
+}
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
new file mode 100644
index 0000000..4af255a
--- /dev/null
+++ b/pulsar-transaction/pom.xml
@@ -0,0 +1,62 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>pom</packaging>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-transaction-parent</artifactId>
+ <name>Pulsar Transaction :: Parent</name>
+
+ <modules>
+ <module>common</module>
+ <module>coordinator</module>
+ </modules>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>