You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/06/25 19:57:03 UTC

[kudu] branch master updated: KUDU-2612 p1: add initial transaction status storage

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 396b70b  KUDU-2612 p1: add initial transaction status storage
396b70b is described below

commit 396b70b24b80b1b0b362910e56e4da3f09c441e0
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Jun 2 16:35:04 2020 -0700

    KUDU-2612 p1: add initial transaction status storage
    
    This adds a system tablet storage API for storing the status of
    transactions, in the form of the newly added TxnStatusTablet which is a
    wrapper around a TabletReplica with a schema tailored for storing
    transaction metadata.
    
    The abstraction is comparable to the SysCatalogTable abstraction used by
    the master to store metadata about the Kudu catalog, but rather than
    storing metadata about tables and tablets, the TxnStatusTablet stores
    metadata about transactions and transaction participants.
    
    Partitioning isn't addressed in this patch, but I'm expecting later
    patches to allow for the creation of partitioned transaction status
    tables, and having the individual tablet replicas of that table be
    accessed via this TxnStatusTablet API.
    
    This patch only introduces the schema, basic write calls, and scan calls
    to be used by a transaction management entity to be added in a later
    patch. There is currently no way to create or define partitions for
    TxnStatusTablets on tablet servers.
    
    Change-Id: I94ddbd37c65932120835d6e138307f819935173c
    Reviewed-on: http://gerrit.cloudera.org:8080/16043
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 CMakeLists.txt                                  |   1 +
 src/kudu/tablet/tablet.cc                       |  10 +
 src/kudu/tablet/tablet.h                        |   5 +-
 src/kudu/transactions/CMakeLists.txt            |  45 +++
 src/kudu/transactions/transactions.proto        |  42 +++
 src/kudu/transactions/txn_status_tablet-test.cc | 280 ++++++++++++++++++
 src/kudu/transactions/txn_status_tablet.cc      | 372 ++++++++++++++++++++++++
 src/kudu/transactions/txn_status_tablet.h       | 117 ++++++++
 8 files changed, 871 insertions(+), 1 deletion(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7b0b3f1..b34c8b2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1458,5 +1458,6 @@ add_subdirectory(src/kudu/subprocess)
 add_subdirectory(src/kudu/tablet)
 add_subdirectory(src/kudu/thrift)
 add_subdirectory(src/kudu/tools)
+add_subdirectory(src/kudu/transactions)
 add_subdirectory(src/kudu/tserver)
 add_subdirectory(src/kudu/util)
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index cd992c9..bc474cc 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -425,6 +425,16 @@ Status Tablet::NewRowIterator(const Schema& projection,
   return NewRowIterator(std::move(opts), iter);
 }
 
+Status Tablet::NewOrderedRowIterator(const Schema& projection,
+                                     unique_ptr<RowwiseIterator>* iter) const {
+  RowIteratorOptions opts;
+  // Yield current rows.
+  opts.snap_to_include = MvccSnapshot(mvcc_);
+  opts.projection = &projection;
+  opts.order = ORDERED;
+  return NewRowIterator(std::move(opts), iter);
+}
+
 Status Tablet::NewRowIterator(RowIteratorOptions opts,
                               unique_ptr<RowwiseIterator>* iter) const {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 48f3b3e..d9cbf85 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -83,7 +83,6 @@ class HistoryGcOpts;
 class MemRowSet;
 class RowSetTree;
 class RowSetsInCompaction;
-class TabletReplicaTestBase;
 class WriteOpState;
 struct RowOp;
 struct TabletComponents;
@@ -198,6 +197,10 @@ class Tablet {
   Status NewRowIterator(const Schema& projection,
                         std::unique_ptr<RowwiseIterator>* iter) const;
 
+  // Like above, but returns an ordered iterator.
+  Status NewOrderedRowIterator(const Schema& projection,
+                               std::unique_ptr<RowwiseIterator>* iter) const;
+
   // Create a new row iterator using specific iterator options.
   //
   // 'opts' contains the options desired from the iterator.
diff --git a/src/kudu/transactions/CMakeLists.txt b/src/kudu/transactions/CMakeLists.txt
new file mode 100644
index 0000000..4f35408
--- /dev/null
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+PROTOBUF_GENERATE_CPP(
+  TRANSACTIONS_PROTO_SRCS TRANSACTIONS_PROTO_HDRS TRANSACTIONS_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES transactions.proto)
+
+add_library(transactions_proto
+  ${TRANSACTIONS_PROTO_SRCS}
+  ${TRANSACTIONS_PROTO_HDRS})
+target_link_libraries(transactions_proto
+  protobuf
+  wire_protocol_proto
+)
+
+set(TRANSACTIONS_SRCS
+  txn_status_tablet.cc)
+
+add_library(transactions ${TRANSACTIONS_SRCS})
+target_link_libraries(transactions
+  kudu_common
+  tablet
+  transactions_proto
+  tserver
+  ${KUDU_BASE_LIBS}
+)
+
+SET_KUDU_TEST_LINK_LIBS(transactions tablet_test_util)
+ADD_KUDU_TEST(txn_status_tablet-test)
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
new file mode 100644
index 0000000..0d669e5
--- /dev/null
+++ b/src/kudu/transactions/transactions.proto
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu.transactions;
+
+enum TxnStatePB {
+  UNKNOWN = 0;
+  OPEN = 1;
+  ABORTED = 2;
+  COMMIT_IN_PROGRESS = 3;
+  COMMITTED = 4;
+}
+
+// TODO(awong): this is a bare-bones implementation so far. We'll certainly
+// need more fields as we build out the rest of the transaction workflow (e.g.
+// commmit timestamps, etc).
+//
+// Metadata encapsulating the status of a transaction.
+message TxnStatusEntryPB {
+  optional TxnStatePB state = 1;
+  optional string user = 2;
+}
+
+// Metadata encapsulating the existence of a transaction participant.
+message TxnParticipantEntryPB {
+  optional TxnStatePB state = 1;
+}
+
diff --git a/src/kudu/transactions/txn_status_tablet-test.cc b/src/kudu/transactions/txn_status_tablet-test.cc
new file mode 100644
index 0000000..e0378b8
--- /dev/null
+++ b/src/kudu/transactions/txn_status_tablet-test.cc
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/transactions/txn_status_tablet.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tablet/tablet_replica-test-base.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::tablet::TabletReplicaTestBase;
+using std::ostream;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace transactions {
+
+namespace {
+
+const char kOwner[] = "bojack";
+const char kParticipant[] = "peanutbutter";
+string ParticipantId(int i) {
+  return Substitute("$0$1", kParticipant, i);
+}
+
+// Simple representation of an entry in the transaction status tablet.
+struct SimpleEntry {
+  int64_t txn_id;
+  TxnStatusEntryPB txn_pb;
+  vector<ParticipantIdAndPB> prt_pbs;
+
+  // Convenience method to create a SimpleEntry.
+  static SimpleEntry Create(int64_t txn_id, const string& user, TxnStatePB txn_state_pb,
+                            vector<std::pair<string, TxnStatePB>> participants) {
+    TxnStatusEntryPB txn_pb;
+    txn_pb.set_state(txn_state_pb);
+    txn_pb.set_user(user);
+    vector<ParticipantIdAndPB> prt_pbs;
+    for (auto& id_and_state : participants) {
+      TxnParticipantEntryPB prt_pb;
+      prt_pb.set_state(id_and_state.second);
+      prt_pbs.emplace_back(std::make_pair(std::move(id_and_state.first), std::move(prt_pb)));
+    }
+    return { txn_id, std::move(txn_pb), std::move(prt_pbs) };
+  }
+
+  bool operator==(const SimpleEntry& other) const {
+    return ToString() == other.ToString();
+  }
+
+  friend ostream& operator<<(ostream& out, const SimpleEntry& e) {
+    out << e.ToString();
+    return out;
+  }
+
+  string ToString() const {
+    vector<string> prt_strs;
+    for (const auto& id_and_prt : prt_pbs) {
+      prt_strs.emplace_back(Substitute("($0, {$1})", id_and_prt.first,
+                            SecureShortDebugString(id_and_prt.second)));
+    }
+    return Substitute("($0, {$1}, [$2])", txn_id,
+        SecureShortDebugString(txn_pb), JoinStrings(prt_strs, ","));
+  }
+};
+
+class SimpleTransactionsVisitor : public TransactionsVisitor {
+ public:
+  void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb,
+                                 vector<ParticipantIdAndPB> participants) override {
+    entries_.emplace_back(SimpleEntry{ txn_id, std::move(status_entry_pb),
+                                       std::move(participants) });
+  }
+  vector<SimpleEntry> ReleaseEntries() {
+    return std::move(entries_);
+  }
+ private:
+  vector<SimpleEntry> entries_;
+};
+
+} // anonymous namespace
+
+class TxnStatusTabletTest : public TabletReplicaTestBase {
+ public:
+  TxnStatusTabletTest()
+      : TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {}
+
+  void SetUp() override {
+    NO_FATALS(TabletReplicaTestBase::SetUp());
+    ConsensusBootstrapInfo info;
+    ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+    status_tablet_.reset(new TxnStatusTablet(tablet_replica_.get()));
+  }
+
+ protected:
+  unique_ptr<TxnStatusTablet> status_tablet_;
+};
+
+TEST_F(TxnStatusTabletTest, TestWriteTransactions) {
+  // We can make multiple calls to add a single transaction. This will only
+  // insert a single row to the table.
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+
+  // The storage abstraction doesn't prevent us from writing a new transaction
+  // entry for a lower transaction ID.
+  ASSERT_OK(status_tablet_->AddNewTransaction(5, kOwner));
+  ASSERT_OK(status_tablet_->AddNewTransaction(2, kOwner));
+
+  // Also try updating the status of one of our transaction entries.
+  TxnStatusEntryPB status_entry_pb;
+  status_entry_pb.set_user(kOwner);
+  status_entry_pb.set_state(TxnStatePB::ABORTED);
+  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb));
+  status_entry_pb.set_state(TxnStatePB::COMMITTED);
+  ASSERT_OK(status_tablet_->UpdateTransaction(2, status_entry_pb));
+
+  // The stored entries should be sorted, de-duplicated, and have the latest
+  // values.
+  const vector<SimpleEntry> kExpectedEntries({
+      SimpleEntry::Create(1, kOwner, TxnStatePB::OPEN, {}),
+      SimpleEntry::Create(2, kOwner, TxnStatePB::COMMITTED, {}),
+      SimpleEntry::Create(5, kOwner, TxnStatePB::OPEN, {}),
+  });
+
+  // Now iterate through the entries.
+  SimpleTransactionsVisitor visitor;
+  ASSERT_OK(status_tablet_->VisitTransactions(&visitor));
+  vector<SimpleEntry> entries = visitor.ReleaseEntries();
+  EXPECT_EQ(kExpectedEntries, entries);
+}
+
+TEST_F(TxnStatusTabletTest, TestWriteParticipants) {
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+
+  // Participants will be de-duplicated.
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
+
+  // There aren't ordering constraints for registering participant IDs.
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(5)));
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(2)));
+
+  // Try updating the status of one of our participant entries.
+  TxnParticipantEntryPB prt_entry_pb;
+  prt_entry_pb.set_state(TxnStatePB::ABORTED);
+  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb));
+  prt_entry_pb.set_state(TxnStatePB::COMMITTED);
+  ASSERT_OK(status_tablet_->UpdateParticipant(1, ParticipantId(2), prt_entry_pb));
+
+  const vector<SimpleEntry> kExpectedEntries({
+      SimpleEntry::Create(1, kOwner, TxnStatePB::OPEN, {
+          { ParticipantId(1), TxnStatePB::OPEN },
+          { ParticipantId(2), TxnStatePB::COMMITTED },
+          { ParticipantId(5), TxnStatePB::OPEN },
+      }),
+  });
+  SimpleTransactionsVisitor visitor;
+  ASSERT_OK(status_tablet_->VisitTransactions(&visitor));
+  vector<SimpleEntry> entries = visitor.ReleaseEntries();
+  EXPECT_EQ(kExpectedEntries, entries);
+}
+
+// Test that a participant entry can't be visited without a corresponding
+// status entry.
+TEST_F(TxnStatusTabletTest, TestFailedVisitor) {
+  ASSERT_OK(status_tablet_->AddNewParticipant(1, ParticipantId(1)));
+  SimpleTransactionsVisitor visitor;
+  Status s = status_tablet_->VisitTransactions(&visitor);
+  ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry");
+
+  // Now try again but with the transaction ID written.
+  ASSERT_OK(status_tablet_->AddNewTransaction(1, kOwner));
+  ASSERT_OK(status_tablet_->VisitTransactions(&visitor));
+
+  // And again with a new transaction ID.
+  ASSERT_OK(status_tablet_->AddNewParticipant(2, ParticipantId(2)));
+  s = status_tablet_->VisitTransactions(&visitor);
+  ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "missing transaction status entry");
+}
+
+// Test that we can write in parallel and read in parallel from the transaction
+// storage tablet.
+TEST_F(TxnStatusTabletTest, TestMultithreadedAccess) {
+  const int kNumThreads = 10;
+  const int kNumParticipantsPerTransaction = 5;
+  vector<thread> threads;
+  vector<Status> statuses(kNumThreads);
+#define RET_IF_NOT_OK(s) do { \
+    Status _s = (s); \
+    if (!_s.ok()) { \
+      statuses[i] = _s; \
+      return; \
+    } \
+  } while (0)
+
+  // Start multiple threads that add a transaction and a bunch of participants,
+  // storing any errors we see.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      RET_IF_NOT_OK(status_tablet_->AddNewTransaction(i, kOwner));
+      for (int p = 0; p < kNumParticipantsPerTransaction; p++) {
+        RET_IF_NOT_OK(status_tablet_->AddNewParticipant(i, Substitute("prt-$0", p)));
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
+  // There should have been no issues inserting.
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+  threads.clear();
+
+  // Now try visiting the transaction status tablet from multiple threads,
+  // verifying we get back the correct number of transactions and participants.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      SimpleTransactionsVisitor visitor;
+      RET_IF_NOT_OK(status_tablet_->VisitTransactions(&visitor));
+      Status s;
+      const auto entries = visitor.ReleaseEntries();
+      if (entries.size() != kNumThreads) {
+        RET_IF_NOT_OK(Status::IllegalState(Substitute("got $0 transactions", entries.size())));
+      }
+      for (const auto& e : entries) {
+        if (e.prt_pbs.size() != kNumParticipantsPerTransaction) {
+          RET_IF_NOT_OK(Status::IllegalState(Substitute("txn $0 had $1 participants",
+                                             e.txn_id, e.prt_pbs.size())));
+        }
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc
new file mode 100644
index 0000000..968454b
--- /dev/null
+++ b/src/kudu/transactions/txn_status_tablet.cc
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/transactions/txn_status_tablet.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/common/column_predicate.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/iterator.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
+#include "kudu/common/rowblock.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/types.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/write_op.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/once.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+
+using kudu::tablet::LatchOpCompletionCallback;
+using kudu::tablet::OpCompletionCallback;
+using kudu::tablet::WriteOpState;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace transactions {
+
+namespace {
+
+const char* kTxnIdColName = "txn_id";
+const char* kEntryTypeColName = "entry_type";
+const char* kIdentifierColName = "identifier";
+const char* kMetadataColName = "metadata";
+
+int kTxnIdColIdx = -1;
+int kEntryTypeColIdx = -1;
+int kIdentifierColIdx = -1;
+int kMetadataColIdx = -1;
+// Initializes the column indices of the transaction status tablet.
+Status InitTxnStatusColIdxs() {
+  static KuduOnceLambda col_idx_initializer;
+  return col_idx_initializer.Init([] {
+    const auto& schema = TxnStatusTablet::GetSchemaWithoutIds();
+    kTxnIdColIdx = schema.find_column(kTxnIdColName);
+    kEntryTypeColIdx = schema.find_column(kEntryTypeColName);
+    kIdentifierColIdx = schema.find_column(kIdentifierColName);
+    kMetadataColIdx = schema.find_column(kMetadataColName);
+    return Status::OK();
+  });
+}
+// NOTE: these column index getters should only be used once a TxnStatusTablet
+// has been constructed.
+int TxnIdColIdx() {
+  DCHECK_NE(-1, kTxnIdColIdx);
+  return kTxnIdColIdx;
+}
+int EntryTypeColIdx() {
+  DCHECK_NE(-1, kEntryTypeColIdx);
+  return kEntryTypeColIdx;
+}
+int IdentifierColIdx() {
+  DCHECK_NE(-1, kIdentifierColIdx);
+  return kIdentifierColIdx;
+}
+int MetadataColIdx() {
+  DCHECK_NE(-1, kMetadataColIdx);
+  return kMetadataColIdx;
+}
+
+Schema kTxnStatusSchemaNoIds;
+// Populates the schema of the transaction status table.
+Status PopulateTxnStatusSchema(SchemaBuilder* builder) {
+  RETURN_NOT_OK(builder->AddKeyColumn(kTxnIdColName, INT64));
+  RETURN_NOT_OK(builder->AddKeyColumn(kEntryTypeColName, INT8));
+  RETURN_NOT_OK(builder->AddKeyColumn(kIdentifierColName, STRING));
+  return builder->AddColumn(kMetadataColName, STRING);
+}
+// Initializes the static transaction status schema.
+Status InitTxnStatusSchemaWithNoIdsOnce() {
+  static KuduOnceLambda schema_initializer;
+  return schema_initializer.Init([] {
+    SchemaBuilder builder;
+    RETURN_NOT_OK(PopulateTxnStatusSchema(&builder));
+    kTxnStatusSchemaNoIds = builder.BuildWithoutIds();
+    return Status::OK();
+  });
+}
+
+WriteRequestPB kTxnStatusWriteReqPB;
+// Initializes the static transaction status tablet write request.
+Status InitWriteRequestPBOnce() {
+  static KuduOnceLambda write_initializer;
+  return write_initializer.Init([] {
+    return SchemaToPB(TxnStatusTablet::GetSchemaWithoutIds(),
+                      kTxnStatusWriteReqPB.mutable_schema());
+  });
+}
+
+// Returns a write request for the transaction status tablet of the given ID.
+WriteRequestPB BuildWriteReqPB(const string& tablet_id) {
+  CHECK_OK(InitWriteRequestPBOnce());
+  WriteRequestPB req = kTxnStatusWriteReqPB;
+  req.set_tablet_id(tablet_id);
+  return req;
+}
+
+// Return the values of the keys of the given transaction status tablet row.
+void ExtractKeys(const RowBlockRow& row, int64_t* txn_id, int8_t* entry_type, Slice* identifier) {
+  const auto& schema = TxnStatusTablet::GetSchemaWithoutIds();
+  *txn_id = *schema.ExtractColumnFromRow<INT64>(row, TxnIdColIdx());
+  *entry_type = *schema.ExtractColumnFromRow<INT8>(row, EntryTypeColIdx());
+  *identifier = *schema.ExtractColumnFromRow<STRING>(row, IdentifierColIdx());
+}
+
+template <typename T>
+Status ExtractMetadataEntry(const RowBlockRow& row, T* pb) {
+  const auto& schema = TxnStatusTablet::GetSchemaWithoutIds();
+  const Slice* entry = schema.ExtractColumnFromRow<STRING>(row, MetadataColIdx());
+  Status s = pb_util::ParseFromArray(pb, entry->data(), entry->size());
+  if (PREDICT_FALSE(!s.ok())) {
+    int64_t txn_id;
+    int8_t entry_type;
+    Slice identifier;
+    ExtractKeys(row, &txn_id, &entry_type, &identifier);
+    VLOG(2) << Substitute("bad entry: $0", entry->ToString());
+    return s.CloneAndPrepend(
+        Substitute("unable to parse entry for $0 record of transaction ID $1 ($2)",
+                   txn_id, entry_type, identifier.ToString()));
+  }
+  return Status::OK();
+}
+
+Status PopulateTransactionEntryRow(int64_t txn_id, const faststring& entry, KuduPartialRow* row) {
+  RETURN_NOT_OK(row->SetInt64(kTxnIdColName, txn_id));
+  RETURN_NOT_OK(row->SetInt8(kEntryTypeColName, TxnStatusTablet::TRANSACTION));
+  RETURN_NOT_OK(row->SetString(kIdentifierColName, ""));
+  return row->SetString(kMetadataColName, entry);
+}
+
+Status PopulateParticipantEntryRow(int64_t txn_id, const string& tablet_id, const faststring& entry,
+                                   KuduPartialRow* row) {
+  RETURN_NOT_OK(row->SetInt64(kTxnIdColName, txn_id));
+  RETURN_NOT_OK(row->SetInt8(kEntryTypeColName, TxnStatusTablet::PARTICIPANT));
+  RETURN_NOT_OK(row->SetString(kIdentifierColName, tablet_id));
+  return row->SetString(kMetadataColName, entry);
+}
+
+} // anonymous namespace
+
+TxnStatusTablet::TxnStatusTablet(tablet::TabletReplica* tablet_replica)
+    : tablet_replica_(DCHECK_NOTNULL(tablet_replica)) {
+  CHECK_OK(InitTxnStatusColIdxs());
+}
+
+const Schema& TxnStatusTablet::GetSchemaWithoutIds() {
+  CHECK_OK(InitTxnStatusSchemaWithNoIdsOnce());
+  return kTxnStatusSchemaNoIds;
+}
+
+Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) {
+  const auto& schema = GetSchemaWithoutIds();
+  // There are only TRANSACTION and PARTICIPANT entries today, but this filter
+  // is conservative in case we add more entry types in the future.
+  faststring record_types;
+  record_types.push_back(TRANSACTION);
+  record_types.push_back(PARTICIPANT);
+  vector<const void*> pred_record_types = { &record_types.at(0), &record_types.at(1) };
+  auto pred = ColumnPredicate::InList(schema.column(EntryTypeColIdx()), &pred_record_types);
+
+  ScanSpec spec;
+  spec.AddPredicate(pred);
+  unique_ptr<RowwiseIterator> iter;
+  RETURN_NOT_OK(tablet_replica_->tablet()->NewOrderedRowIterator(schema, &iter));
+  RETURN_NOT_OK(iter->Init(&spec));
+
+  // Keep track of the current transaction ID so we know when to start a new
+  // transaction.
+  boost::optional<int64_t> prev_txn_id = boost::none;
+  TxnStatusEntryPB prev_status_entry_pb;
+  vector<ParticipantIdAndPB> prev_participants;
+  Arena arena(32 * 1024);
+  RowBlock block(&iter->schema(), 512, &arena);
+  // Iterate over the transaction and participant entries, notifying the
+  // visitor once a transaction and all its participants have been found.
+  while (iter->HasNext()) {
+    RETURN_NOT_OK(iter->NextBlock(&block));
+    const size_t nrows = block.nrows();
+    for (size_t i = 0; i < nrows; ++i) {
+      if (!block.selection_vector()->IsRowSelected(i)) {
+        continue;
+      }
+      const auto& row = block.row(i);
+      int64_t txn_id;
+      int8_t entry_type;
+      Slice identifier;
+      ExtractKeys(row, &txn_id, &entry_type, &identifier);
+      switch (entry_type) {
+        case TRANSACTION: {
+          if (PREDICT_FALSE(prev_txn_id && *prev_txn_id == txn_id)) {
+            return Status::Corruption(
+                Substitute("duplicate transaction entry: $0", txn_id));
+          }
+          if (prev_txn_id) {
+            // We've previously collected the state for a transaction. Signal
+            // to the visitor what the state of the previous transaction was.
+            visitor->VisitTransactionEntries(*prev_txn_id, std::move(prev_status_entry_pb),
+                                             std::move(prev_participants));
+
+            // Sanity check: we're iterating in increasing txn_id order.
+            DCHECK_GT(txn_id, *prev_txn_id);
+          }
+          prev_txn_id = txn_id;
+          prev_participants.clear();
+          RETURN_NOT_OK(ExtractMetadataEntry(row, &prev_status_entry_pb));
+          continue;
+        }
+        case PARTICIPANT: {
+          if (PREDICT_FALSE(!prev_txn_id || *prev_txn_id != txn_id)) {
+            return Status::Corruption(
+                Substitute("missing transaction status entry for $0$1", txn_id,
+                           prev_txn_id ? Substitute(", currently on ID $0", *prev_txn_id) : ""));
+          }
+          TxnParticipantEntryPB pb;
+          RETURN_NOT_OK(ExtractMetadataEntry(row, &pb));
+          prev_participants.emplace_back(
+              schema.ExtractColumnFromRow<STRING>(row, IdentifierColIdx())->ToString(),
+              std::move(pb));
+          continue;
+        }
+        default:
+          LOG(DFATAL) << "Unknown entry type: " << entry_type;
+          continue;
+      }
+    }
+  }
+  if (prev_txn_id) {
+    visitor->VisitTransactionEntries(*prev_txn_id, std::move(prev_status_entry_pb),
+                                     std::move(prev_participants));
+  }
+  return Status::OK();
+}
+
+Status TxnStatusTablet::AddNewTransaction(int64_t txn_id, const string& user) {
+  WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
+
+  TxnStatusEntryPB entry;
+  entry.set_state(OPEN);
+  entry.set_user(user);
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+
+  KuduPartialRow row(&GetSchemaWithoutIds());
+  RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT_IGNORE, row);
+  return SyncWrite(req);
+}
+
+Status TxnStatusTablet::UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb) {
+  WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(pb, &metadata_buf);
+
+  KuduPartialRow row(&GetSchemaWithoutIds());
+  RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::UPDATE, row);
+  return SyncWrite(req);
+}
+
+Status TxnStatusTablet::AddNewParticipant(int64_t txn_id, const string& tablet_id) {
+  WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
+
+  TxnParticipantEntryPB entry;
+  entry.set_state(OPEN);
+  faststring metadata_buf;
+  pb_util::SerializeToString(entry, &metadata_buf);
+
+  KuduPartialRow row(&TxnStatusTablet::GetSchemaWithoutIds());
+  PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row);
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::INSERT_IGNORE, row);
+  return SyncWrite(req);
+}
+
+Status TxnStatusTablet::UpdateParticipant(int64_t txn_id, const string& tablet_id,
+                                          const TxnParticipantEntryPB& pb) {
+  WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id());
+
+  faststring metadata_buf;
+  pb_util::SerializeToString(pb, &metadata_buf);
+
+  KuduPartialRow row(&GetSchemaWithoutIds());
+  RETURN_NOT_OK(PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row));
+  RowOperationsPBEncoder enc(req.mutable_row_operations());
+  enc.Add(RowOperationsPB::UPDATE, row);
+  return SyncWrite(req);
+}
+
+Status TxnStatusTablet::SyncWrite(const WriteRequestPB& req) {
+  DCHECK(req.has_tablet_id());
+  DCHECK(req.has_schema());
+  CountDownLatch latch(1);
+  WriteResponsePB resp;
+  unique_ptr<OpCompletionCallback> op_cb(
+      new LatchOpCompletionCallback<WriteResponsePB>(&latch, &resp));
+  unique_ptr<WriteOpState> op_state(
+      new WriteOpState(tablet_replica_,
+                       &req,
+                       nullptr, // RequestIdPB
+                       &resp));
+  op_state->set_completion_callback(std::move(op_cb));
+  RETURN_NOT_OK(tablet_replica_->SubmitWrite(std::move(op_state)));
+  latch.Wait();
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  if (resp.per_row_errors_size() > 0) {
+    for (const auto& error : resp.per_row_errors()) {
+      LOG(ERROR) << Substitute(
+          "row $0: $1", error.row_index(), StatusFromPB(error.error()).ToString());
+    }
+    return Status::Incomplete(
+        Substitute("failed to write $0 rows to transaction status tablet $1",
+                   resp.per_row_errors_size(), tablet_replica_->tablet_id()));
+  }
+  return Status::OK();
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/txn_status_tablet.h b/src/kudu/transactions/txn_status_tablet.h
new file mode 100644
index 0000000..1b9378f
--- /dev/null
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Schema;
+
+namespace tablet {
+class TabletReplica;
+}  // namespace tablet
+
+namespace tserver {
+class WriteRequestPB;
+} // namespace tserver
+
+namespace transactions {
+
+typedef std::pair<std::string, TxnParticipantEntryPB> ParticipantIdAndPB;
+// Encapsulates the reading of state of the transaction status tablet.
+// Different implementations of visitors may store the contents of the
+// transactions differently in-memory.
+class TransactionsVisitor {
+ public:
+  // Signal to the visitor that a transaction exists with the given transaction
+  // ID and status, with the given group of participants.
+  virtual void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb,
+                                       std::vector<ParticipantIdAndPB> participants) = 0;
+};
+
+// TxnStatusTablet is a partition of a Kudu table that keeps track of
+// transaction-related system information like the statuses and IDs of
+// transactions and participants.
+//
+// It has the schema:
+//
+//   (txn_id INT64, entry_type INT8, identifier STRING) -> metadata STRING
+//
+// - txn_id: the transaction ID associated with the entry. It is the first part
+//   of the compound key so iteration over the table entries yields all state
+//   relevant to a given transaction ID at once.
+// - entry_type: a TxnStatusEntryType that indicates whether the entry is a
+//   transaction entry, participant entry, etc.
+// - identifier: an extra identifier record that is used differently by
+//   different kinds of entries. For participant entries, this is the
+//   participant's tablet ID.
+// - metadata: a protobuf message whose type depends on the entry_type.
+//
+// While the methods of this class are thread-safe, it is up to the caller
+// to enforce any desired consistency constraints. E.g. if requested, the
+// TxnStatusTablet will happily write entries indicating the presence of
+// participants without there being a corresponding transaction status entry.
+// If that behavior is not desirable, callers should coordinate calls to the
+// TxnStatusTablet to avoid it.
+//
+// Expected usage of this class is to have a management layer that reads and
+// writes to the underlying replica only if it is leader.
+//
+// TODO(awong): ensure that only the leader TxnStatusManager can call these.
+// TODO(awong): delete transactions that are entirely aborted or committed.
+// TODO(awong): consider batching writes.
+class TxnStatusTablet {
+ public:
+  enum TxnStatusEntryType {
+    TRANSACTION = 1,
+    PARTICIPANT = 2,
+  };
+  explicit TxnStatusTablet(tablet::TabletReplica* tablet_replica);
+
+  // Returns the schema of the transactions status table.
+  static const Schema& GetSchemaWithoutIds();
+
+  // Uses the given visitor to iterate over the entries in the rows of the
+  // underlying tablet replica. This allows the visitor to load the on-disk
+  // contents of the tablet into a more usable memory representation.
+  Status VisitTransactions(TransactionsVisitor* visitor);
+
+  // Writes to the underlying storage. Returns an error if there was an error
+  // writing the new entry.
+  Status AddNewTransaction(int64_t txn_id, const std::string& user);
+  Status UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb);
+  Status AddNewParticipant(int64_t txn_id, const std::string& tablet_id);
+  Status UpdateParticipant(int64_t txn_id, const std::string& tablet_id,
+                           const TxnParticipantEntryPB& pb);
+
+ private:
+  // Writes 'req' to the underlying tablet replica, returning an error if there
+  // was a problem replicating the request, or if there were any row errors.
+  Status SyncWrite(const tserver::WriteRequestPB& req);
+
+  // The tablet replica that backs this transaction status tablet.
+  tablet::TabletReplica* tablet_replica_;
+};
+
+} // namespace transactions
+} // namespace kudu