You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/07 05:03:17 UTC

[pulsar] branch master updated: [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation (#4161)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 277bcee  [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation (#4161)
277bcee is described below

commit 277bceef0366a2ef17ff6e30f62f77af0872fbca
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 7 13:03:12 2019 +0800

    [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation (#4161)
    
    * [transaction][coordinator] add the interfaces for transaction metadata store and an in-memory implementation
    
    Master Issue: #2664
    
    *Motivation*
    
    Setup the project structure for developing transaction coordinator.
    
    *Changes*
    
    - setup the project structure
    - introduce common classes: `TxnID`, `TxnStatus`
    - introduce `TransactionMetadataStore` for TC to store and manage the transaction status
    - enable checkstyle for `pulsar-transaction`
    
    *NOTES*
    
    All the changes are isolated into `pulsar-transaction` module. Hence it will not block any other changes or releases.
---
 pom.xml                                            |   3 +
 pulsar-transaction/common/pom.xml                  |  72 ++++++
 .../pulsar/transaction/impl/common/TxnID.java      |  48 ++++
 .../pulsar/transaction/impl/common/TxnStatus.java  |  65 ++++++
 .../transaction/impl/common/package-info.java      |  22 ++
 .../transaction/impl/common/TxnStatusTest.java     | 118 ++++++++++
 pulsar-transaction/coordinator/pom.xml             |  46 ++++
 .../coordinator/TransactionCoordinatorID.java      |  36 +++
 .../coordinator/TransactionMetadataStore.java      |  96 ++++++++
 .../TransactionMetadataStoreProvider.java          |  64 ++++++
 .../pulsar/transaction/coordinator/TxnMeta.java    |  95 ++++++++
 .../exceptions/CoordinatorException.java           |  38 ++++
 .../exceptions/InvalidTxnStatusException.java      |  44 ++++
 .../exceptions/TransactionNotFoundException.java   |  39 ++++
 .../coordinator/exceptions/package-info.java       |  22 ++
 .../impl/InMemTransactionMetadataStore.java        | 114 ++++++++++
 .../InMemTransactionMetadataStoreProvider.java     |  38 ++++
 .../transaction/coordinator/impl/TxnMetaImpl.java  | 150 +++++++++++++
 .../transaction/coordinator/impl/package-info.java |  22 ++
 .../transaction/coordinator/package-info.java      |  22 ++
 .../TransactionMetadataStoreProviderTest.java      | 245 +++++++++++++++++++++
 pulsar-transaction/pom.xml                         |  62 ++++++
 22 files changed, 1461 insertions(+)

diff --git a/pom.xml b/pom.xml
index e0f35e7..b116b03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,9 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-broker-auth-sasl</module>
     <module>pulsar-client-auth-sasl</module>
 
+    <!-- transaction related modules -->
+    <module>pulsar-transaction</module>
+
     <!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
     <module>jclouds-shaded</module>
 
diff --git a/pulsar-transaction/common/pom.xml b/pulsar-transaction/common/pom.xml
new file mode 100644
index 0000000..b4fa95d
--- /dev/null
+++ b/pulsar-transaction/common/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-transaction-parent</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-transaction-common</artifactId>
+    <name>Pulsar Transaction :: Common </name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf3.version}</version>
+        </dependency>
+        
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
+                    <checkStaleness>true</checkStaleness>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
new file mode 100644
index 0000000..16ce4e4
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import com.google.common.annotations.Beta;
+import java.io.Serializable;
+import lombok.Data;
+
+/**
+ * An identifier for representing a transaction.
+ */
+@Beta
+@Data
+public class TxnID implements Serializable {
+
+    private static final long serialVersionUID = 0L;
+
+    /*
+     * The most significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private final long mostSigBits;
+
+    /*
+     * The least significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private final long leastSigBits;
+
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
new file mode 100644
index 0000000..dc5b8d9
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * A enum represents the status of a transaction.
+ */
+@Beta
+public enum TxnStatus {
+
+    // A new transaction is open.
+    OPEN,
+    // A transaction is in the progress of committing.
+    COMMITTING,
+    // A transaction is already committed.
+    COMMITTED,
+    // A transaction is in the progress of aborting.
+    ABORTING,
+    // A transaction is already aborted.
+    ABORTED;
+
+    /**
+     * Check if the a status can be transaction to a new status.
+     *
+     * @param newStatus the new status
+     * @return true if the current status can be transitioning to.
+     */
+    public boolean canTransitionTo(TxnStatus newStatus) {
+        TxnStatus currentStatus = this;
+
+        switch (currentStatus) {
+            case OPEN:
+                return newStatus != COMMITTED && newStatus != ABORTED;
+            case COMMITTING:
+                return newStatus == COMMITTING || newStatus == COMMITTED;
+            case COMMITTED:
+                return newStatus == COMMITTED;
+            case ABORTING:
+                return newStatus == ABORTING || newStatus == ABORTED;
+            case ABORTED:
+                return newStatus == ABORTED;
+            default:
+                throw new IllegalArgumentException("Unknown txn status : " + newStatus);
+        }
+    }
+
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java
new file mode 100644
index 0000000..caa4ce9
--- /dev/null
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Common classes used by pulsar transaction related modules.
+ */
+package org.apache.pulsar.transaction.impl.common;
\ No newline at end of file
diff --git a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java b/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
new file mode 100644
index 0000000..7a8c8b9
--- /dev/null
+++ b/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.impl.common;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link TxnStatus}.
+ */
+public class TxnStatusTest {
+
+    @DataProvider(name = "statuses")
+    public static Object[][] statuses() {
+        return new Object[][] {
+            {
+                TxnStatus.OPEN,
+                Sets.newHashSet(
+                    TxnStatus.OPEN,
+                    TxnStatus.COMMITTING,
+                    TxnStatus.ABORTING
+                ),
+                Sets.newHashSet(
+                    TxnStatus.COMMITTED,
+                    TxnStatus.ABORTED
+                )
+            },
+            {
+                TxnStatus.COMMITTING,
+                Sets.newHashSet(
+                    TxnStatus.COMMITTING,
+                    TxnStatus.COMMITTED
+                ),
+                Sets.newHashSet(
+                    TxnStatus.OPEN,
+                    TxnStatus.ABORTING,
+                    TxnStatus.ABORTED
+                )
+            },
+            {
+                TxnStatus.COMMITTED,
+                Sets.newHashSet(
+                    TxnStatus.COMMITTED
+                ),
+                Sets.newHashSet(
+                    TxnStatus.OPEN,
+                    TxnStatus.COMMITTING,
+                    TxnStatus.ABORTING,
+                    TxnStatus.ABORTED
+                )
+            },
+            {
+                TxnStatus.ABORTING,
+                Sets.newHashSet(
+                    TxnStatus.ABORTING,
+                    TxnStatus.ABORTED
+                ),
+                Sets.newHashSet(
+                    TxnStatus.OPEN,
+                    TxnStatus.COMMITTING,
+                    TxnStatus.COMMITTED
+                )
+            },
+            {
+                TxnStatus.ABORTED,
+                Sets.newHashSet(
+                    TxnStatus.ABORTED
+                ),
+                Sets.newHashSet(
+                    TxnStatus.OPEN,
+                    TxnStatus.COMMITTING,
+                    TxnStatus.COMMITTED,
+                    TxnStatus.ABORTING
+                )
+            },
+        };
+    }
+
+    @Test(dataProvider = "statuses")
+    public void testTxnStatusTransition(TxnStatus status,
+                                        Set<TxnStatus> statusesCanTransitionTo,
+                                        Set<TxnStatus> statusesCanNotTransactionTo) {
+        statusesCanTransitionTo.forEach(newStatus -> {
+            assertTrue(
+                status.canTransitionTo(newStatus),
+                "Status `" + status + "` should be able to transition to `" + newStatus + "`"
+            );
+        });
+        statusesCanNotTransactionTo.forEach(newStatus -> {
+            assertFalse(
+                status.canTransitionTo(newStatus),
+                "Status `" + status + "` should NOT be able to transition to `" + newStatus + "`"
+            );
+        });
+    }
+
+}
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
new file mode 100644
index 0000000..a95eeb8
--- /dev/null
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-transaction-parent</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-transaction-coordinator</artifactId>
+    <name>Pulsar Transaction :: Coordinator</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-transaction-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java
new file mode 100644
index 0000000..3e249b3
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionCoordinatorID.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import lombok.Data;
+
+/**
+ * A class for representing the identifier for a transaction coordinator.
+ */
+@Beta
+@Data
+public class TransactionCoordinatorID {
+
+    /**
+     * The identifier of the transaction coordinator.
+     */
+    private final long id;
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
new file mode 100644
index 0000000..e3a9d7d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A store for storing all the transaction metadata.
+ */
+@Beta
+public interface TransactionMetadataStore {
+
+    /**
+     * Query the {@link TxnStatus} of a given transaction <tt>txnid</tt>.
+     *
+     * @param txnid transaction id
+     * @return a future represents the result of this operation.
+     *         it returns {@link TxnStatus} of the given transaction.
+     */
+    default CompletableFuture<TxnStatus> getTxnStatus(TxnID txnid) {
+        return getTxnMeta(txnid).thenApply(txnMeta -> txnMeta.status());
+    }
+
+    /**
+     * Query the {@link TxnMeta} of a given transaction <tt>txnid</tt>.
+     *
+     * @param txnid transaction id
+     * @return a future represents the result of this operation.
+     *         it returns {@link TxnMeta} of the given transaction.
+     */
+    CompletableFuture<TxnMeta> getTxnMeta(TxnID txnid);
+
+    /**
+     * Create a new transaction in the transaction metadata store.
+     *
+     * @return a future represents the result of creating a new transaction.
+     *         it returns {@link TxnID} as the identifier for identifying the
+     *         transaction.
+     */
+    CompletableFuture<TxnID> newTransaction();
+
+    /**
+     * Add the produced partitions to transaction identified by <tt>txnid</tt>.
+     *
+     * @param txnid transaction id
+     * @param partitions the list of partitions that a transaction produces to
+     * @return a future represents the result of this operation
+     */
+    CompletableFuture<Void> addProducedPartitionToTxn(
+        TxnID txnid, List<String> partitions);
+
+    /**
+     * Add the acked partitions to transaction identified by <tt>txnid</tt>.
+     *
+     * @param txnid transaction id
+     * @param partitions the list of partitions that a transaction acknowledge to
+     * @return a future represents the result of the operation
+     */
+    CompletableFuture<Void> addAckedPartitionToTxn(
+        TxnID txnid, List<String> partitions);
+
+    /**
+     * Update the transaction from <tt>expectedStatus</tt> to <tt>newStatus</tt>.
+     *
+     * <p>If the current transaction status is not <tt>expectedStatus</tt>, the
+     * update will be failed.
+     *
+     * @param newStatus the new txn status that the transaction should be updated to
+     * @param expectedStatus the expected status that the transaction should be
+     * @return a future represents the result of the operation
+     */
+    CompletableFuture<Void> updateTxnStatus(
+        TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus);
+
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..46c66bc
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.Beta;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A provider that provides {@link TransactionMetadataStore}.
+ */
+@Beta
+public interface TransactionMetadataStoreProvider {
+
+    /**
+     * Construct a provider from the provided class.
+     *
+     * @param providerClassName the provider class name.
+     * @return an instance of transaction metadata store provider.
+     */
+    static TransactionMetadataStoreProvider newProvider(String providerClassName) throws IOException {
+        Class<?> providerClass;
+        try {
+            providerClass = Class.forName(providerClassName);
+            Object obj = providerClass.newInstance();
+            checkArgument(obj instanceof TransactionMetadataStoreProvider,
+                "The factory has to be an instance of "
+                    + TransactionMetadataStoreProvider.class.getName());
+
+            return (TransactionMetadataStoreProvider) obj;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Open the transaction metadata store for transaction coordinator
+     * identified by <tt>transactionCoordinatorId</tt>.
+     *
+     * @return a future represents the result of the operation.
+     *         an instance of {@link TransactionMetadataStore} is returned
+     *         if the operation succeeds.
+     */
+    CompletableFuture<TransactionMetadataStore> openStore(
+        TransactionCoordinatorID transactionCoordinatorId);
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
new file mode 100644
index 0000000..39d92aa
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.annotations.Beta;
+import java.util.List;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * An interface represents the metadata of a transaction in {@link TransactionMetadataStore}.
+ */
+@Beta
+public interface TxnMeta {
+
+    /**
+     * Return the transaction id.
+     *
+     * @return transaction id.
+     */
+    TxnID id();
+
+    /**
+     * Return the transaction status.
+     *
+     * @return transaction status.
+     */
+    TxnStatus status();
+
+    /**
+     * Return the the list of partitions that this transaction produces to.
+     *
+     * @return the list of partitions that this transaction produced to.
+     *         the returned list is sorted by partition name.
+     */
+    List<String> producedPartitions();
+
+    /**
+     * Return the the list of partitions that this transaction acknowledges to.
+     *
+     * @return the list of partitions that this transaction acknowledges to.
+     *         the returned list is sorted by partition name.
+     */
+    List<String> ackedPartitions();
+
+    /**
+     * Add the list of produced partitions to the transaction.
+     *
+     * @return transaction meta
+     * @throws InvalidTxnStatusException if the transaction is not in
+     *         {@link TxnStatus#OPEN}
+     */
+    TxnMeta addProducedPartitions(List<String> partitions)
+        throws InvalidTxnStatusException;
+
+    /**
+     * Add the list of acked partitions to the transaction.
+     *
+     * @return transaction meta
+     * @throws InvalidTxnStatusException if the transaction is not in
+     *         {@link TxnStatus#OPEN}
+     */
+    TxnMeta addAckedPartitions(List<String> partitions)
+        throws InvalidTxnStatusException;
+
+    /**
+     * Update the transaction stats from the <tt>newStatus</tt> only when
+     * the current status is the expected <tt>expectedStatus</tt>.
+     *
+     * @param newStatus the new transaction status
+     * @param expectedStatus the expected transaction status
+     * @return the transaction itself.
+     * @throws InvalidTxnStatusException if the transaction is not in the expected
+     *         status, or it can not be transitioned to the new status
+     */
+    TxnMeta updateTxnStatus(TxnStatus newStatus,
+                            TxnStatus expectedStatus) throws InvalidTxnStatusException;
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
new file mode 100644
index 0000000..3863b2f
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+/**
+ * The base exception for exceptions thrown from coordinator.
+ */
+public abstract class CoordinatorException extends Exception {
+    private static final long serialVersionUID = 0L;
+
+    public CoordinatorException(String message) {
+        super(message);
+    }
+
+    public CoordinatorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CoordinatorException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java
new file mode 100644
index 0000000..489b550
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/InvalidTxnStatusException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * Exception is thrown when transaction is not in the right status.
+ */
+public class InvalidTxnStatusException extends CoordinatorException {
+
+    private static final long serialVersionUID = 0L;
+
+    public InvalidTxnStatusException(String message) {
+        super(message);
+    }
+
+    public InvalidTxnStatusException(TxnID txnID,
+                                     TxnStatus expectedStatus,
+                                     TxnStatus actualStatus) {
+        super(
+            "Expect Txn `" + txnID + "` to be in " + expectedStatus
+                + " status but it is in " + actualStatus + " status");
+
+    }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java
new file mode 100644
index 0000000..86f0321
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/TransactionNotFoundException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
+
+/**
+ * Exception is thrown when a transaction is not found in coordinator.
+ */
+public class TransactionNotFoundException extends CoordinatorException {
+
+    private static final long serialVersionUID = 0L;
+
+    public TransactionNotFoundException(String message) {
+        super(message);
+    }
+
+    public TransactionNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public TransactionNotFoundException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java
new file mode 100644
index 0000000..7ed25d4
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Internal exceptions for transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator.exceptions;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
new file mode 100644
index 0000000..260935a
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * An in-memory implementation of {@link TransactionMetadataStore}.
+ */
+class InMemTransactionMetadataStore implements TransactionMetadataStore {
+
+    private final TransactionCoordinatorID tcID;
+    private final AtomicLong localID;
+    private final ConcurrentMap<TxnID, TxnMetaImpl> transactions;
+
+    InMemTransactionMetadataStore(TransactionCoordinatorID tcID) {
+        this.tcID = tcID;
+        this.localID = new AtomicLong(0L);
+        this.transactions = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnid) {
+        CompletableFuture<TxnMeta> getFuture = new CompletableFuture<>();
+        TxnMetaImpl txn = transactions.get(txnid);
+        if (null == txn) {
+            getFuture.completeExceptionally(
+                new TransactionNotFoundException("Transaction not found :" + txnid));
+        } else {
+            getFuture.complete(txn);
+        }
+        return getFuture;
+    }
+
+    @Override
+    public CompletableFuture<TxnID> newTransaction() {
+        TxnID txnID = new TxnID(
+            tcID.getId(),
+            localID.getAndIncrement()
+        );
+        TxnMetaImpl txn = new TxnMetaImpl(txnID);
+        transactions.put(txnID, txn);
+        return CompletableFuture.completedFuture(txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnid, List<String> partitions) {
+        return getTxnMeta(txnid).thenCompose(txn -> {
+            try {
+                txn.addProducedPartitions(partitions);
+                return CompletableFuture.completedFuture(null);
+            } catch (InvalidTxnStatusException e) {
+                CompletableFuture<Void> error = new CompletableFuture<>();
+                error.completeExceptionally(e);
+                return error;
+            }
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<String> partitions) {
+        return getTxnMeta(txnid).thenCompose(txn -> {
+            try {
+                txn.addAckedPartitions(partitions);
+                return CompletableFuture.completedFuture(null);
+            } catch (InvalidTxnStatusException e) {
+                CompletableFuture<Void> error = new CompletableFuture<>();
+                error.completeExceptionally(e);
+                return error;
+            }
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> updateTxnStatus(TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus) {
+        return getTxnMeta(txnid).thenCompose(txn -> {
+            try {
+                txn.updateTxnStatus(newStatus, expectedStatus);
+                return CompletableFuture.completedFuture(null);
+            } catch (InvalidTxnStatusException e) {
+                CompletableFuture<Void> error = new CompletableFuture<>();
+                error.completeExceptionally(e);
+                return error;
+            }
+        });
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..127106d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+
+/**
+ * The provider that offers in-memory implementation of {@link TransactionMetadataStore}.
+ */
+public class InMemTransactionMetadataStoreProvider implements TransactionMetadataStoreProvider {
+
+    @Override
+    public CompletableFuture<TransactionMetadataStore>
+        openStore(TransactionCoordinatorID transactionCoordinatorId) {
+        return CompletableFuture.completedFuture(
+            new InMemTransactionMetadataStore(transactionCoordinatorId));
+    }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
new file mode 100644
index 0000000..f05af362
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A class represents the metadata of a transaction stored in
+ * the {@link org.apache.pulsar.transaction.coordinator.TransactionMetadataStore}.
+ */
+class TxnMetaImpl implements TxnMeta {
+
+    private final TxnID txnID;
+    private final Set<String> producedPartitions = new HashSet<>();
+    private final Set<String> ackedPartitions = new HashSet<>();
+    private TxnStatus txnStatus;
+
+    TxnMetaImpl(TxnID txnID) {
+        this.txnID = txnID;
+        this.txnStatus = TxnStatus.OPEN;
+    }
+
+    @Override
+    public TxnID id() {
+        return txnID;
+    }
+
+    /**
+     * Return the current status of the transaction.
+     *
+     * @return current status of the transaction.
+     */
+    @Override
+    public synchronized TxnStatus status() {
+        return txnStatus;
+    }
+
+    @Override
+    public List<String> producedPartitions() {
+        List<String> returnedPartitions;
+        synchronized (this) {
+            returnedPartitions = new ArrayList<>(producedPartitions.size());
+            returnedPartitions.addAll(producedPartitions);
+        }
+        Collections.sort(returnedPartitions);
+        return returnedPartitions;
+    }
+
+    @Override
+    public List<String> ackedPartitions() {
+        List<String> returnedPartitions;
+        synchronized (this) {
+            returnedPartitions = new ArrayList<>(ackedPartitions.size());
+            returnedPartitions.addAll(ackedPartitions);
+        }
+        Collections.sort(returnedPartitions);
+        return returnedPartitions;
+    }
+
+    /**
+     * Check if the transaction is in an expected status.
+     *
+     * @param expectedStatus
+     */
+    private synchronized void checkTxnStatus(TxnStatus expectedStatus) throws InvalidTxnStatusException {
+        if (this.txnStatus != expectedStatus) {
+            throw new InvalidTxnStatusException(
+                txnID, expectedStatus, txnStatus
+            );
+        }
+    }
+
+    /**
+     * Add the list partitions that the transaction produces to.
+     *
+     * @param partitions the list of partitions that the txn produces to
+     * @return the transaction itself.
+     * @throws CoordinatorException
+     */
+    @Override
+    public synchronized TxnMetaImpl addProducedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+        checkTxnStatus(TxnStatus.OPEN);
+
+        this.producedPartitions.addAll(partitions);
+        return this;
+    }
+
+    /**
+     * Remove the list partitions that the transaction acknowledges to.
+     *
+     * @param partitions the list of partitions that the txn acknowledges to
+     * @return the transaction itself.
+     * @throws CoordinatorException
+     */
+    @Override
+    public synchronized TxnMetaImpl addAckedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+        checkTxnStatus(TxnStatus.OPEN);
+
+        this.ackedPartitions.addAll(partitions);
+        return this;
+    }
+
+    /**
+     * Update the transaction stats from the <tt>newStatus</tt> only when
+     * the current status is the expected <tt>expectedStatus</tt>.
+     *
+     * @param newStatus the new transaction status
+     * @param expectedStatus the expected transaction status
+     * @return the transaction itself.
+     * @throws InvalidTxnStatusException
+     */
+    @Override
+    public synchronized TxnMetaImpl updateTxnStatus(TxnStatus newStatus,
+                                                    TxnStatus expectedStatus)
+        throws InvalidTxnStatusException {
+        checkTxnStatus(expectedStatus);
+        if (!txnStatus.canTransitionTo(newStatus)) {
+            throw new InvalidTxnStatusException(
+                "Transaction `" + txnID + "` CANNOT transaction from status " + txnStatus + " to " + newStatus);
+        }
+        this.txnStatus = newStatus;
+        return this;
+    }
+
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java
new file mode 100644
index 0000000..59ce99b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementations of the transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java
new file mode 100644
index 0000000..4416ff9
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes for implementing pulsar transaction coordinator.
+ */
+package org.apache.pulsar.transaction.coordinator;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
new file mode 100644
index 0000000..4037f9d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.exceptions.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test different transaction metadata store provider.
+ */
+public class TransactionMetadataStoreProviderTest {
+
+    @DataProvider(name = "providers")
+    public static Object[][] providers() {
+        return new Object[][] {
+            { InMemTransactionMetadataStoreProvider.class.getName() }
+        };
+    }
+
+    private final String providerClassName;
+    private TransactionMetadataStoreProvider provider;
+    private TransactionCoordinatorID tcId;
+    private TransactionMetadataStore store;
+
+    @Factory(dataProvider = "providers")
+    public TransactionMetadataStoreProviderTest(String providerClassName) throws Exception {
+        this.providerClassName = providerClassName;
+        this.provider = TransactionMetadataStoreProvider.newProvider(providerClassName);
+    }
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.tcId = new TransactionCoordinatorID(1L);
+        this.store = this.provider.openStore(tcId).get();
+    }
+
+    @Test
+    public void testGetTxnStatusNotFound() throws Exception {
+        try {
+            this.store.getTxnStatus(
+                new TxnID(tcId.getId(), 12345L)).get();
+            fail("Should fail to get txn status of a non-existent transaction");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    @Test
+    public void testGetTxnStatusSuccess() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+    }
+
+    @Test
+    public void testUpdateTxnStatusSuccess() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+
+        // update the status
+        this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+        // get the new status
+        TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(newTxnStatus, TxnStatus.COMMITTING);
+    }
+
+    @Test
+    public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+
+        // update the status
+        try {
+            this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.COMMITTING).get();
+            fail("Should fail to update txn status if it is not in expected status");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+        }
+
+        // get the txn status, it should be changed.
+        TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(newTxnStatus, TxnStatus.OPEN);
+    }
+
+    @Test
+    public void testUpdateTxnStatusCannotTransition() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+
+        // update the status
+        try {
+            this.store.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.OPEN).get();
+            fail("Should fail to update txn status if it can not transition to the new status");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+        }
+
+        // get the txn status, it should be changed.
+        TxnStatus newTxnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(newTxnStatus, TxnStatus.OPEN);
+    }
+
+    @Test
+    public void testAddProducedPartition() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+
+        List<String> partitions = new ArrayList<>();
+        partitions.add("ptn-0");
+        partitions.add("ptn-1");
+        partitions.add("ptn-2");
+
+        // add the list of partitions to the transaction
+        this.store.addProducedPartitionToTxn(txnID, partitions).get();
+
+        TxnMeta txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.OPEN);
+        assertEquals(txn.producedPartitions(), partitions);
+
+        // add another list of partition. duplicated partitions should be removed
+        List<String> newPartitions = new ArrayList<>();
+        newPartitions.add("ptn-2");
+        newPartitions.add("ptn-3");
+        newPartitions.add("ptn-4");
+        this.store.addProducedPartitionToTxn(txnID, newPartitions);
+
+        txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.OPEN);
+        List<String> finalPartitions = new ArrayList<>();
+        finalPartitions.add("ptn-0");
+        finalPartitions.add("ptn-1");
+        finalPartitions.add("ptn-2");
+        finalPartitions.add("ptn-3");
+        finalPartitions.add("ptn-4");
+        assertEquals(txn.producedPartitions(), finalPartitions);
+
+        // change the transaction to `COMMITTING`
+        this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+        // add partitions should fail if it is already committing.
+        List<String> newPartitions2 = new ArrayList<>();
+        newPartitions2.add("ptn-5");
+        newPartitions2.add("ptn-6");
+        try {
+            this.store.addProducedPartitionToTxn(txnID, newPartitions2).get();
+            fail("Should fail to add produced partitions if the transaction is not in OPEN status");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+        }
+
+        txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.COMMITTING);
+        assertEquals(txn.producedPartitions(), finalPartitions);
+    }
+
+    @Test
+    public void testAddAckedPartition() throws Exception {
+        TxnID txnID = this.store.newTransaction().get();
+        TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
+        assertEquals(txnStatus, TxnStatus.OPEN);
+
+        List<String> partitions = new ArrayList<>();
+        partitions.add("ptn-0");
+        partitions.add("ptn-1");
+        partitions.add("ptn-2");
+
+        // add the list of partitions to the transaction
+        this.store.addAckedPartitionToTxn(txnID, partitions).get();
+
+        TxnMeta txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.OPEN);
+        assertEquals(txn.ackedPartitions(), partitions);
+
+        // add another list of partition. duplicated partitions should be removed
+        List<String> newPartitions = new ArrayList<>();
+        newPartitions.add("ptn-2");
+        newPartitions.add("ptn-3");
+        newPartitions.add("ptn-4");
+        this.store.addAckedPartitionToTxn(txnID, newPartitions);
+
+        txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.OPEN);
+        List<String> finalPartitions = new ArrayList<>();
+        finalPartitions.add("ptn-0");
+        finalPartitions.add("ptn-1");
+        finalPartitions.add("ptn-2");
+        finalPartitions.add("ptn-3");
+        finalPartitions.add("ptn-4");
+        assertEquals(txn.ackedPartitions(), finalPartitions);
+
+        // change the transaction to `COMMITTING`
+        this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+        // add partitions should fail if it is already committing.
+        List<String> newPartitions2 = new ArrayList<>();
+        newPartitions2.add("ptn-5");
+        newPartitions2.add("ptn-6");
+        try {
+            this.store.addAckedPartitionToTxn(txnID, newPartitions2).get();
+            fail("Should fail to add acked partitions if the transaction is not in OPEN status");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof InvalidTxnStatusException);
+        }
+
+        txn = this.store.getTxnMeta(txnID).get();
+        assertEquals(txn.status(), TxnStatus.COMMITTING);
+        assertEquals(txn.ackedPartitions(), finalPartitions);
+    }
+
+}
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
new file mode 100644
index 0000000..4af255a
--- /dev/null
+++ b/pulsar-transaction/pom.xml
@@ -0,0 +1,62 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <packaging>pom</packaging>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-transaction-parent</artifactId>
+  <name>Pulsar Transaction :: Parent</name>
+
+  <modules>
+    <module>common</module>
+    <module>coordinator</module>
+  </modules>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>check-style</id>
+            <phase>verify</phase>
+            <configuration>
+              <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+              <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+              <encoding>UTF-8</encoding>
+            </configuration>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>