You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/03/23 17:50:48 UTC

[kudu] branch master updated: KUDU-2612: allow aborting after beginning to commit

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 058ec9a  KUDU-2612: allow aborting after beginning to commit
058ec9a is described below

commit 058ec9a2baa9edb75904e8aa83716dd2734f3bbd
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 2 17:07:39 2021 -0800

    KUDU-2612: allow aborting after beginning to commit
    
    This patch adjusts the TxnStatusManager state machine to include a new
    FINALIZE_IN_PROGRESS state to serve as an intermediate step between
    COMMIT_IN_PROGRESS and COMMITTED.
    
    The goal is to allow for aborts to occur in the event that we've begun
    committing, but not yet called FINALIZE_COMMIT on any of the
    transaction's participants, which may be desirable in cases where
    anything surprising has happened on the participants (e.g. if they're
    deleted).
    
    Change-Id: If1b6596df2db5601f7e17e528ad6dc68057b67f8
    Reviewed-on: http://gerrit.cloudera.org:8080/17022
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/client/KuduTransaction.java    |   1 +
 src/kudu/client/transaction-internal.cc            |   1 +
 src/kudu/integration-tests/txn_commit-itest.cc     | 219 ++++++++++++---
 src/kudu/master/txn_manager-test.cc                |   2 +-
 src/kudu/tablet/txn_participant-test.cc            |   4 +-
 src/kudu/transactions/transactions.proto           |  33 +++
 src/kudu/transactions/txn_status_entry.cc          |   6 +
 src/kudu/transactions/txn_status_entry.h           |   6 +
 src/kudu/transactions/txn_status_manager-test.cc   | 160 ++++++-----
 src/kudu/transactions/txn_status_manager.cc        | 305 +++++++++++++++++----
 src/kudu/transactions/txn_status_manager.h         | 110 +++++++-
 11 files changed, 681 insertions(+), 166 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 19286f2..05a5616 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -317,6 +317,7 @@ public class KuduTransaction implements AutoCloseable {
         throw new NonRecoverableException(Status.IllegalState("transaction is still open"));
       case COMMITTED:
         return true;
+      case FINALIZE_IN_PROGRESS:
       case COMMIT_IN_PROGRESS:
         return false;
       default:
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index ada717a..1436e24 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -314,6 +314,7 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
       *is_complete = true;
       *completion_status = Status::Aborted("transaction has been aborted");
       break;
+    case TxnStatePB::FINALIZE_IN_PROGRESS:
     case TxnStatePB::COMMIT_IN_PROGRESS:
       *is_complete = false;
       *completion_status = Status::Incomplete("commit is still in progress");
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index 4f2ea2b..f125f54 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -17,9 +17,11 @@
 
 #include <atomic>
 #include <cstdint>
+#include <cstdlib>
 #include <functional>
 #include <map>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <thread>
 #include <unordered_map>
@@ -40,8 +42,10 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
@@ -94,6 +98,7 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace itest {
@@ -190,18 +195,25 @@ class TxnCommitITest : public KuduTest {
     return Status::OK();
   }
 
-  // Insert 'num_rows' rows to the given session, starting with 'start_row'.
+  // Insert 'num_rows' rows to the given session, starting with 'start_row', to
+  // every table in 'table_names' or 'table_name_' if not set.
   Status InsertToSession(
-      const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows) {
-    shared_ptr<KuduTable> table;
-    RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
-    const int target_row_id = start_row + num_rows;
-    for (int i = start_row; i < target_row_id; i++) {
-      auto* insert = table->NewInsert();
-      RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i));
-      RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i));
-      RETURN_NOT_OK(txn_session->Apply(insert));
-      RETURN_NOT_OK(txn_session->Flush());
+      const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows,
+      vector<string> table_names = {}) {
+    if (table_names.empty()) {
+      table_names = { table_name_ };
+    }
+    for (const auto& table_name : table_names) {
+      shared_ptr<KuduTable> table;
+      RETURN_NOT_OK(client_->OpenTable(table_name, &table));
+      const int target_row_id = start_row + num_rows;
+      for (int i = start_row; i < target_row_id; i++) {
+        auto* insert = table->NewInsertIgnore();
+        RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i));
+        RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i));
+        RETURN_NOT_OK(txn_session->Apply(insert));
+        RETURN_NOT_OK(txn_session->Flush());
+      }
     }
     return Status::OK();
   }
@@ -418,13 +430,13 @@ TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
   ASSERT_OK(client_->DeleteTable(table_name_));
   ASSERT_OK(txn->Commit(/*wait*/false));
 
-  // The transaction should eventually succeed, treating the deleted
-  // participant as committed.
+  // The transaction should eventually fail, treating the deleted participant
+  // as a fatal error.
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
     bool is_complete = false;
     ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-    ASSERT_OK(completion_status);
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
     ASSERT_TRUE(is_complete);
   });
 }
@@ -442,22 +454,23 @@ TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) {
 
   ASSERT_OK(txn->Commit(/*wait*/false));
 
-  // The transaction should eventually succeed, treating the deleted
-  // participant as committed.
+  // The transaction should eventually abort, treating the deleted participant
+  // as fatal, resulting in an aborted transaction.
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
     bool is_complete = false;
     ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-    ASSERT_OK(completion_status);
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
     ASSERT_TRUE(is_complete);
   });
 }
 
 TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000;
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
-  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   ASSERT_OK(txn->Commit(/*wait*/false));
   // Stop the tserver without allowing the finalize commit to complete.
   cluster_->mini_tablet_server(0)->Shutdown();
@@ -487,9 +500,72 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
   });
 }
 
+// Test aborting a botched commit mid-way by deleting some of its participants
+// while committing. The result should be that the transaction gets aborted and
+// all participants abort the local transactions.
+TEST_F(TxnCommitITest, TestAbortRacingWithBotchedCommit) {
+  // First, create another table that we'll delete later on.
+  const string kSecondTableName = "default.second_table";
+  TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+  w.set_num_replicas(1);
+  w.set_num_tablets(2);
+  w.set_table_name(kSecondTableName);
+  w.Setup();
+  w.Start();
+  while (w.rows_inserted() < 1) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  w.StopAndJoin();
+  unordered_set<string> participant_ids;
+  auto* mts = cluster_->mini_tablet_server(0);
+  for (const auto& tablet_id : mts->ListTablets()) {
+    if (tablet_id != tsm_id_) {
+      participant_ids.emplace(tablet_id);
+    }
+  }
+  ASSERT_EQ(4, participant_ids.size());
+  vector<string> both_tables_participant_ids(participant_ids.begin(), participant_ids.end());
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn,
+                            { table_name_, kSecondTableName }));
+  ASSERT_OK(client_->DeleteTable(kSecondTableName));
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->Rollback());
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+    ASSERT_TRUE(is_complete);
+  });
+
+  // Let's confirm that all remaining participants see the same transaction
+  // metadata.
+  vector<scoped_refptr<TabletReplica>> replicas;
+  mts->server()->tablet_manager()->GetTabletReplicas(&replicas);
+  vector<vector<TxnParticipant::TxnEntry>> txn_entries_per_replica;
+  for (const auto& r : replicas) {
+    if (r->tablet_id() != tsm_id_ && r->tablet_metadata()->table_name() != kSecondTableName) {
+      txn_entries_per_replica.emplace_back(r->tablet()->txn_participant()->GetTxnsForTests());
+    }
+  }
+  ASSERT_GT(txn_entries_per_replica.size(), 1);
+  for (int i = 1; i < txn_entries_per_replica.size(); i++) {
+    const auto& txns = txn_entries_per_replica[i];
+    ASSERT_FALSE(txns.empty());
+    for (const auto& txn_entry : txns) {
+      ASSERT_EQ(tablet::kAborted, txn_entry.state);
+    }
+    EXPECT_EQ(txn_entries_per_replica[0], txns);
+  }
+}
+
 // Test restarting while commit tasks are on-going, while at the same time,
-// some participants are deleted. There should be no inconsistencies in
-// assigned commit timestamps across participants.
+// some participants are deleted. The transaction should be aborted on all
+// participants.
 TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
   // First, create another table that we'll delete later on.
   const string kSecondTableName = "default.second_table";
@@ -507,27 +583,25 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
   shared_ptr<KuduTransaction> txn;
   shared_ptr<KuduSession> txn_session;
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
-  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
-  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn,
+                            { table_name_, kSecondTableName }));
+  ASSERT_OK(client_->DeleteTable(kSecondTableName));
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000;
   ASSERT_OK(txn->Commit(/*wait*/false));
 
-  // Wait a bit to let the commit tasks start.
-  SleepFor(MonoDelta::FromMilliseconds(1000));
-
   // Shut down without giving time for the commit to complete.
   auto* mts = cluster_->mini_tablet_server(0);
   mts->Shutdown();
   ASSERT_OK(mts->Restart());
-  ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
 
-  // Delete some of the participants. Despite this, the commit process should
-  // complete.
-  ASSERT_OK(client_->DeleteTable(kSecondTableName));
+  // Delete some of the participants. Upon completion, the commit process
+  // should result in an abort.
+  ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
     bool is_complete = false;
     ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
-    ASSERT_OK(completion_status);
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
     ASSERT_TRUE(is_complete);
   });
 
@@ -546,7 +620,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
     const auto& txns = txn_entries_per_replica[i];
     ASSERT_FALSE(txns.empty());
     for (const auto& txn_entry : txns) {
-      ASSERT_NE(-1, txn_entry.commit_timestamp);
+      ASSERT_EQ(tablet::kAborted, txn_entry.state);
     }
     EXPECT_EQ(txn_entries_per_replica[0], txns);
   }
@@ -610,7 +684,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
 
 // Try concurrently beginning to commit a bunch of different transactions.
 TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
-  const int kNumTxns = 4;
+  constexpr const int kNumTxns = 4;
   vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
   int row_start = initial_row_count_;
   for (int i = 0; i < kNumTxns; i++) {
@@ -649,6 +723,65 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
   ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn * kNumTxns, num_rows);
 }
 
+TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) {
+  constexpr const int kNumTxns = 10;
+  vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
+  int row_start = initial_row_count_;
+  for (int i = 0; i < kNumTxns; i++) {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(&txns[i], &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn));
+    row_start += kNumRowsPerTxn;
+  }
+  int num_rows = 0;
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+  // To encourage races between concurrent aborts and commits, inject a random
+  // sleep before each call.
+  constexpr const int kMaxSleepMs = 1000;
+  std::atomic<int> num_committed_txns = 0;
+  vector<thread> threads;
+  for (int i = 0; i < kNumTxns; i++) {
+    threads.emplace_back([&, i] {
+      SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs));
+      Status s = txns[i]->Commit(/*wait*/true);
+      if (s.ok()) {
+        num_committed_txns++;
+      }
+    });
+    threads.emplace_back([&, i] {
+      SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs));
+      ignore_result(txns[i]->Rollback());
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_OK(CountRows(&num_rows));
+  // NOTE: we can compare an exact count here and not worry about whether we
+  // completed aborting rows because even if we didn't complete the abort, they
+  // shouldn't be visible to clients anyway.
+  const int expected_rows = initial_row_count_ + kNumRowsPerTxn * num_committed_txns;
+  VLOG(1) << Substitute("Expecting $0 rows from $1 committed transactions",
+                        expected_rows, num_committed_txns.load());
+  ASSERT_EQ(expected_rows, num_rows);
+
+  // Ensure all transactions are either committed or aborted.
+  vector<scoped_refptr<TabletReplica>> replicas;
+  cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas);
+  for (const auto& r : replicas) {
+    ASSERT_EVENTUALLY([&] {
+      if (r->tablet_metadata()->table_name() == table_name_) {
+        const auto txns = r->tablet()->txn_participant()->GetTxnsForTests();
+        for (const auto& txn : txns) {
+          ASSERT_TRUE(txn.state == tablet::kAborted || txn.state == tablet::kCommitted)
+              << Substitute("Txn in unexpected state: $0", txn.state);;
+        }
+      }
+    });
+  }
+}
+
 // Test that committing the same transaction concurrently doesn't lead to any
 // issues.
 TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
@@ -660,7 +793,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
   ASSERT_OK(CountRows(&num_rows));
   ASSERT_EQ(initial_row_count_, num_rows);
 
-  const int kNumThreads = 4;
+  constexpr const int kNumThreads = 4;
   vector<thread> threads;
   vector<Status> results(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
@@ -685,7 +818,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
   ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows);
 }
 
-TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
+TEST_F(TxnCommitITest, TestDontBackgroundAbortIfCommitInProgress) {
   FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000;
   string serialized_txn;
   {
@@ -714,6 +847,24 @@ TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
   });
 }
 
+// Test that we can abort if a transaction hasn't finalized its commit yet.
+TEST_F(TxnCommitITest, TestAbortIfCommitInProgress) {
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000;
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(&txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->Rollback());
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted());
+    ASSERT_TRUE(is_complete);
+  });
+}
+
 // Test that has two nodes so we can place the TxnStatusManager and transaction
 // participant on separate nodes. This can be useful for testing when some
 // nodes are down.
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index d58f1f9..1409697 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -392,7 +392,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(
         s.ToString(),
-        Substitute("transaction ID $0 is already in terminal state", txn_id));
+        Substitute("transaction ID $0 is not available for further commits", txn_id));
     // The transaction should stay in ABORTED state, of course.
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index ca8af95..c1e5437 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -245,9 +245,9 @@ TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
   }), txn_participant()->GetTxnsForTests());
 }
 
-TEST_F(TxnParticipantTest, TestTransactionNotFound) {
+TEST_F(TxnParticipantTest, TestParticipantOpsWhenNotBegun) {
   const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
-                                       int64_t txn_id) {
+                                  int64_t txn_id) {
     for (const auto& type : ops) {
       ParticipantResponsePB resp;
       ASSERT_OK(CallParticipantOp(
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
index 4d7b8f9..a58d8bd 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -19,12 +19,45 @@ package kudu.transactions;
 
 option java_package = "org.apache.kudu.transactions";
 
+// The following state changes are expected:
+//
+//     BeginCommit           FinalizeCommit        CompleteCommit
+//   OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
+//
+//     BeginCommit           BeginAbort            FinalizeAbort
+//   OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
+//
+//     AbortTxn              FinalizeAbort
+//   OPEN --> ABORT_IN_PROGRESS --> ABORTED
 enum TxnStatePB {
   UNKNOWN = 0;
+  // The transaction is open. Users can write to participants, and register new
+  // participants.
   OPEN = 1;
+
+  // A user or Kudu's transaction staleness tracker has signaled that the
+  // transaction should be aborted. No further participants can be registered,
+  // and the transaction can only be moved to the ABORTED state after sending
+  // ABORT_TXN ops to all participants.
   ABORT_IN_PROGRESS = 5;
+
+  // The transaction has been fully aborted -- all participants have
+  // successfully replicated ABORT_TXN ops and cleared the transaction. No
+  // further tasks are required to abort the transaction.
   ABORTED = 2;
+
+  // The user has signaled that the transaction should be committed. No further
+  // participants can be registered. The transaction may still be aborted if
+  // prompted by a user or if sending any BEGIN_COMMIT op fails.
   COMMIT_IN_PROGRESS = 3;
+
+  // Kudu has successfully sent BEGIN_COMMIT ops to all participants, and has
+  // started sending FINALIZE_COMMIT ops to participants. The transaction can
+  // only be moved to the COMMITTED state.
+  FINALIZE_IN_PROGRESS = 6;
+
+  // All FINALIZE_COMMIT ops have succeeded. No further tasks are required to
+  // commit the transaction.
   COMMITTED = 4;
 }
 
diff --git a/src/kudu/transactions/txn_status_entry.cc b/src/kudu/transactions/txn_status_entry.cc
index 9d77d81..3560fa6 100644
--- a/src/kudu/transactions/txn_status_entry.cc
+++ b/src/kudu/transactions/txn_status_entry.cc
@@ -65,5 +65,11 @@ TxnStatePB TransactionEntry::state() const {
   return l.data().pb.state();
 }
 
+int64_t TransactionEntry::commit_timestamp() const {
+  CowLock<PersistentTransactionEntry> l(&metadata_, LockMode::READ);
+  DCHECK(l.data().pb.has_commit_timestamp());
+  return l.data().pb.commit_timestamp();
+}
+
 } // namespace transactions
 } // namespace kudu
diff --git a/src/kudu/transactions/txn_status_entry.h b/src/kudu/transactions/txn_status_entry.h
index fe25409..b51ac1a 100644
--- a/src/kudu/transactions/txn_status_entry.h
+++ b/src/kudu/transactions/txn_status_entry.h
@@ -103,6 +103,12 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
   // via the locking provided by the underlying copy-on-write metadata_ object.
   TxnStatePB state() const;
 
+  // An accessor to the transaction's commit timestamp. Concurrent access is
+  // controlled via the locking provided by the underlying copy-on-write
+  // metadata_ object. Should only be used if the transaction is expected to
+  // have a commit timestamp already set.
+  int64_t commit_timestamp() const;
+
  private:
   friend class RefCountedThreadSafe<TransactionEntry>;
   ~TransactionEntry() = default;
diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc
index 61f38c6..aaa7e4a 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -461,31 +461,47 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_FALSE(txn_status.has_state());
     ASSERT_FALSE(txn_status.has_user());
   }
+  constexpr const int64_t kNoTxnId = 0;
+  constexpr const int64_t kCommittedTxnId = 1;
+  constexpr const int64_t kAbortInProgressTxnId = 2;
+  constexpr const int64_t kCommitInProgressTxnId = 3;
+  constexpr const int64_t kOpenTxnId = 4;
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(kCommittedTxnId, kOwner, nullptr, &ts_error));
 
     TxnStatusEntryPB txn_status;
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        1, kOwner, &txn_status, &ts_error));
+        kCommittedTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(kCommittedTxnId, kOwner, &ts_error));
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        1, kOwner, &txn_status, &ts_error));
+        kCommittedTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, Timestamp::kInitialTimestamp, &ts_error));
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kCommittedTxnId,
+                                                      Timestamp::kInitialTimestamp, &ts_error));
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        1, kOwner, &txn_status, &ts_error));
+        kCommittedTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
+    ASSERT_TRUE(txn_status.has_commit_timestamp());
+    ASSERT_EQ(TxnStatePB::FINALIZE_IN_PROGRESS, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->CompleteCommitTransaction(kCommittedTxnId));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        kCommittedTxnId, kOwner, &txn_status, &ts_error));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_TRUE(txn_status.has_commit_timestamp());
     ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
@@ -494,12 +510,12 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
-    ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(kAbortInProgressTxnId, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->AbortTransaction(kAbortInProgressTxnId, kOwner, &ts_error));
 
     TxnStatusEntryPB txn_status;
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        2, kOwner, &txn_status, &ts_error));
+        kAbortInProgressTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
@@ -510,11 +526,11 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     // Start another transaction and start its commit phase.
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
-    ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(kCommitInProgressTxnId, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(kCommitInProgressTxnId, kOwner, &ts_error));
 
     // Start just another transaction.
-    ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(kOpenTxnId, kOwner, nullptr, &ts_error));
   }
 
   // Make the TxnStatusManager start from scratch.
@@ -529,28 +545,28 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        1, kOwner, &txn_status, &ts_error));
+        kCommittedTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        2, kOwner, &txn_status, &ts_error));
+        kAbortInProgressTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        3, kOwner, &txn_status, &ts_error));
+        kCommitInProgressTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
     ASSERT_OK(txn_manager_->GetTransactionStatus(
-        4, kOwner, &txn_status, &ts_error));
+        kOpenTxnId, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
@@ -563,7 +579,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
-        1, "stranger", &txn_status, &ts_error);
+        kCommittedTxnId, "stranger", &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   }
 
@@ -573,7 +589,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
-        0, kOwner, &txn_status, &ts_error);
+        kNoTxnId, kOwner, &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   }
 
@@ -583,7 +599,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
-        0, "stranger", &txn_status, &ts_error);
+        kNoTxnId, "stranger", &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   }
 }
@@ -601,19 +617,22 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
   }
 
-  // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
+  const auto not_authorized_keep_alive = [&] (const int64_t txn_id) {
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->KeepTransactionAlive(txn_id, "stranger", &ts_error);
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        Substitute("transaction ID $0 not owned by stranger", txn_id));
+  };
+
+  // OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
     // Supplying wrong user for transaction in OPEN state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(),
-                          "transaction ID 1 not owned by stranger");
-    }
+    NO_FATALS(not_authorized_keep_alive(1));
 
     ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
     auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
@@ -621,28 +640,26 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     ASSERT_STR_CONTAINS(s.ToString(),
                         "transaction ID 1 is in commit phase");
     // Supplying wrong user for transaction in COMMIT_IN_PROGRESS state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(),
-                          "transaction ID 1 not owned by stranger");
-    }
+    NO_FATALS(not_authorized_keep_alive(1));
 
     ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, Timestamp::kInitialTimestamp, &ts_error));
     s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "transaction ID 1 is already in terminal state");
+                        "transaction ID 1 is in commit phase");
+    // Supplying wrong user for transaction in FINALIZE_IN_PROGRESS state.
+    NO_FATALS(not_authorized_keep_alive(1));
+
+    ASSERT_OK(txn_manager_->CompleteCommitTransaction(1));
+    s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 1 is not available for further commits");
     // Supplying wrong user for transaction in COMMITTED state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(),
-                          "transaction ID 1 not owned by stranger");
-    }
+    NO_FATALS(not_authorized_keep_alive(1));
   }
 
-  // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
+  // OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
@@ -659,16 +676,12 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "transaction ID 2 is already in terminal state");
+                        "transaction ID 2 is not available for further commits");
     // Supplying wrong user for transaction in ABORTED state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
-    }
+    NO_FATALS(not_authorized_keep_alive(2));
   }
 
-  // OPEN --> ABORTED
+  // OPEN --> ABORT_IN_PROGRESS
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
@@ -679,7 +692,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     auto s = txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "transaction ID 3 is already in terminal state");
+                        "transaction ID 3 is not available for further commits");
   }
 
   // Open a new transaction just before restarting the TxnStatusManager.
@@ -703,14 +716,9 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "transaction ID 1 is already in terminal state");
-    // Supplying wrong user for transaction in COMMITTED state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(),
-                          "transaction ID 1 not owned by stranger");
-    }
+                        "transaction ID 1 is not available for further commits");
+    // Supplying wrong user for transaction in FINALIZE_IN_PROGRESS state.
+    NO_FATALS(not_authorized_keep_alive(1));
   }
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
@@ -718,25 +726,16 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
-                        "transaction ID 2 is already in terminal state");
-    // Supplying wrong user for transaction in ABORTED state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
-    }
+                        "transaction ID 2 is not available for further commits");
+    // Supplying wrong user for transaction in ABORT_IN_PROGRESS state.
+    NO_FATALS(not_authorized_keep_alive(2));
   }
   {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
     // Supplying wrong user for transaction in OPEN state.
-    {
-      auto s = txn_manager_->KeepTransactionAlive(4, "stranger", &ts_error);
-      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-      ASSERT_STR_CONTAINS(s.ToString(),
-                          "transaction ID 4 not owned by stranger");
-    }
+    NO_FATALS(not_authorized_keep_alive(4));
   }
 }
 
@@ -792,12 +791,16 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
   s = txn_manager_->FinalizeCommitTransaction(kTxnId1, Timestamp::kInitialTimestamp, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  s = txn_manager_->CompleteCommitTransaction(kTxnId1);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't finalize a commit that hasn't begun committing.
   const int64_t kTxnId2 = 2;
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
   s = txn_manager_->FinalizeCommitTransaction(kTxnId2, Timestamp::kInitialTimestamp, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  s = txn_manager_->CompleteCommitTransaction(kTxnId2);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't abort a transaction that has finished committing.
   ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
@@ -810,6 +813,18 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
   ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
       kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
+
+  // We can't abort a transaction that has been completely committed.
+  ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId2));
+  s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // Redundant begin commit, finalize calls, and complete calls are also
+  // benign.
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
+      kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
+  ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId2));
 }
 
 // Test that we can only add participants to a transaction when it's in an
@@ -834,12 +849,17 @@ TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) {
   s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
-  // We can't register participants when we've finished committnig.
+  // We can't register participants when we've finalized the commit.
   ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
       kTxnId1, Timestamp::kInitialTimestamp, &ts_error));
   s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
+  // We can't register participants when we've completely committed.
+  ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId1));
+  s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
   // We can't register participants when we've aborted the transaction.
   const int64_t kTxnId2 = 2;
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 8981c5a..ce8e24d 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -194,6 +194,24 @@ bool CommitTasks::IsShuttingDownCleanup() const {
   return false;
 }
 
+bool CommitTasks::ScheduleAbortIfNecessary() {
+  switch (abort_txn_) {
+    case ABORT_IN_PROGRESS:
+      LOG(INFO) << Substitute("Scheduling write for ABORT_IN_PROGRESS for txn $0",
+                              txn_id_.ToString());
+      ScheduleBeginAbortTxnWrite();
+      return true;
+    case ABORTED:
+      LOG(INFO) << Substitute("Scheduling ABORT_TXNs on participants for txn $0",
+                              txn_id_.ToString());
+      AbortTxnAsync();
+      return true;
+    default:
+      break;
+  }
+  return false;
+}
+
 void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
   DCHECK_LT(participant_idx, participant_ids_.size());
   // Status callback called with the result from the participant op. This is
@@ -218,12 +236,16 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
       // been committed, rather than attempting to abort or something. This is
       // important to ensure retries of the commit tasks reliably result in the
       // same operations being performed.
-      LOG(INFO) << Substitute("Participant $0 was not found: $1",
+      LOG(INFO) << Substitute("Participant $0 was not found for BEGIN_COMMIT, aborting: $1",
                               participant_ids_[participant_idx], s.ToString());
+      SetNeedsBeginAbort();
     } else if (PREDICT_FALSE(!s.ok())) {
       // For any other kind of error, just exit without completing.
       // TODO(awong): we're presuming that such errors wouldn't benefit from
       // just retrying.
+      // TODO(awong): we don't expect them, but if we ever somehow find
+      // ourselves with an aborted transaction on the participant, we should
+      // probably abort here.
       LOG(WARNING) << Substitute("Participant $0 BEGIN_COMMIT op returned $1",
                                  participant_ids_[participant_idx], s.ToString());
       stop_task_ = true;
@@ -235,12 +257,16 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
       if (IsShuttingDownCleanup()) {
         return;
       }
+      if (ScheduleAbortIfNecessary()) {
+        return;
+      }
       Timestamp max_timestamp(Timestamp::kInitialTimestamp);
       for (const auto& ts : begin_commit_timestamps_) {
         max_timestamp = std::max(ts, max_timestamp);
       }
       DCHECK_NE(Timestamp::kInitialTimestamp, max_timestamp);
-      FinalizeCommitAsync(max_timestamp);
+      set_commit_timestamp(max_timestamp);
+      ScheduleFinalizeCommitWrite();
     }
   };
   ParticipantOpPB op_pb;
@@ -254,27 +280,26 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
       &begin_commit_timestamps_[participant_idx]);
 }
 
-void CommitTasks::FinalizeCommitAsyncTask(int participant_idx, const Timestamp& commit_timestamp) {
+void CommitTasks::FinalizeCommitAsyncTask(int participant_idx) {
+  DCHECK_EQ(TxnStatePB::UNKNOWN, abort_txn_);
   DCHECK_LT(participant_idx, participant_ids_.size());
   // Status callback called with the result from the participant op.
   scoped_refptr<CommitTasks> scoped_this(this);
   auto participated_cb = [this, scoped_this = std::move(scoped_this),
-                          participant_idx, commit_timestamp] (const Status& s) {
+                          participant_idx] (const Status& s) {
     if (IsShuttingDownCleanupIfLastOp()) {
       return;
     }
     if (PREDICT_FALSE(s.IsTimedOut())) {
       LOG(WARNING) << Substitute("Retrying FINALIZE_COMMIT op for txn $0: $1",
                                  txn_id_.ToString(), s.ToString());
-      FinalizeCommitAsyncTask(participant_idx, commit_timestamp);
+      FinalizeCommitAsyncTask(participant_idx);
       return;
-    } else if (PREDICT_FALSE(!s.ok())) {
+    }
+    if (PREDICT_FALSE(!s.ok())) {
       // Presumably the error is not transient (e.g. not found) so retrying
       // won't help. But we've already begun sending out FINALIZE_TXN ops, so
       // we must complete the transaction.
-      // TODO(awong): revisit this; if we include an intermediate state between
-      // kCommitInProgress and kCommitted, that might be an opportune moment to
-      // abort.
       LOG(WARNING) << Substitute("Participant $0 FINALIZE_COMMIT op returned $1",
                                  participant_ids_[participant_idx], s.ToString());
     }
@@ -284,13 +309,13 @@ void CommitTasks::FinalizeCommitAsyncTask(int participant_idx, const Timestamp&
       if (IsShuttingDownCleanup()) {
         return;
       }
-      ScheduleFinalizeCommitWrite(commit_timestamp);
+      ScheduleCompleteCommitWrite();
     }
   };
   ParticipantOpPB op_pb;
   op_pb.set_txn_id(txn_id_.value());
   op_pb.set_type(ParticipantOpPB::FINALIZE_COMMIT);
-  op_pb.set_finalized_commit_timestamp(commit_timestamp.value());
+  op_pb.set_finalized_commit_timestamp(commit_timestamp_.value());
   txn_client_->ParticipateInTransactionAsync(
       participant_ids_[participant_idx],
       std::move(op_pb),
@@ -313,7 +338,8 @@ void CommitTasks::AbortTxnAsyncTask(int participant_idx) {
     if (PREDICT_FALSE(s.IsNotFound())) {
       // If the participant has been deleted, treat it as though it's already
       // been aborted. The participant's data can't be read anyway.
-      LOG(INFO) << Substitute("Participant $0 was not found: $1",
+      LOG(INFO) << Substitute("Participant $0 was not found for ABORT_TXN, proceeding "
+                              "as if op succeeded: $1",
                               participant_ids_[participant_idx], s.ToString());
     } else if (PREDICT_FALSE(!s.ok())) {
       LOG(WARNING) << Substitute("Participant $0 ABORT_TXN op returned $1",
@@ -326,7 +352,7 @@ void CommitTasks::AbortTxnAsyncTask(int participant_idx) {
       if (IsShuttingDownCleanup()) {
         return;
       }
-      ScheduleAbortTxnWrite();
+      ScheduleFinalizeAbortTxnWrite();
     }
   };
   ParticipantOpPB op_pb;
@@ -343,7 +369,7 @@ void CommitTasks::AbortTxnAsync() {
   // Reset the in-flight counter to indicate we're waiting for this new set of
   // tasks to complete.
   if (participant_ids_.empty()) {
-    ScheduleAbortTxnWrite();
+    ScheduleFinalizeAbortTxnWrite();
   } else {
     ops_in_flight_ = participant_ids_.size();
     for (int i = 0; i < participant_ids_.size(); i++) {
@@ -352,7 +378,41 @@ void CommitTasks::AbortTxnAsync() {
   }
 }
 
-void CommitTasks::ScheduleAbortTxnWrite() {
+void CommitTasks::ScheduleBeginAbortTxnWrite() {
+  DCHECK_NE(TxnStatePB::UNKNOWN, abort_txn_);
+  DCHECK_EQ(0, ops_in_flight_);
+  // Submit the task to a threadpool.
+  // NOTE: This is called by the reactor thread that catches the BeginCommit
+  // response, so we can't do IO in this thread.
+  scoped_refptr<CommitTasks> scoped_this(this);
+  CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+                                 tsm = txn_status_manager_,
+                                 txn_id = txn_id_] {
+    if (stop_task_ || tsm->shutting_down()) {
+      tsm->RemoveCommitTask(txn_id, this);
+      return;
+    }
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+    if (PREDICT_TRUE(l.first_failed_status().ok())) {
+      if (abort_txn_ == ABORT_IN_PROGRESS) {
+        TabletServerErrorPB ts_error;
+        // Clear out these commit tasks so we can start new ones focused on
+        // aborting.
+        tsm->RemoveCommitTask(txn_id, this);
+        WARN_NOT_OK(tsm->BeginAbortTransaction(txn_id.value(), boost::none, &ts_error),
+                    "Error writing to transaction status table");
+      } else {
+        // It's possible that while we were waiting to be scheduled, a client
+        // already called BeginAbortTransaction(). If so, we just need to go
+        // about aborting the transaction.
+        DCHECK_EQ(ABORTED, abort_txn_);
+        AbortTxnAsync();
+      }
+    }
+  }));
+}
+
+void CommitTasks::ScheduleFinalizeAbortTxnWrite() {
   // Submit the task to a threadpool.
   // NOTE: This is called by the reactor thread that catches the BeginCommit
   // response, so we can't do IO in this thread.
@@ -378,36 +438,39 @@ void CommitTasks::ScheduleAbortTxnWrite() {
   }));
 }
 
-void CommitTasks::FinalizeCommitAsync(Timestamp commit_timestamp) {
+void CommitTasks::FinalizeCommitAsync() {
+  DCHECK_NE(Timestamp::kInitialTimestamp, commit_timestamp_);
   // Reset the in-flight counter to indicate we're waiting for this new set of
   // tasks to complete.
   auto old_val = ops_in_flight_.exchange(participant_ids_.size());
   DCHECK_EQ(0, old_val);
   ops_in_flight_ = participant_ids_.size();
   for (int i = 0; i < participant_ids_.size(); i++) {
-    FinalizeCommitAsyncTask(i, commit_timestamp);
+    FinalizeCommitAsyncTask(i);
   }
 }
 
-void CommitTasks::ScheduleFinalizeCommitWrite(Timestamp commit_timestamp) {
+void CommitTasks::ScheduleCompleteCommitWrite() {
   // Submit the task to a threadpool.
-  // NOTE: This is called by the reactor thread that catches the BeginCommit
+  // NOTE: This is called by the reactor thread that catches the FinalizeCommit
   // response, so we can't do IO in this thread.
+  DCHECK_EQ(TxnStatePB::UNKNOWN, abort_txn_);
   DCHECK_EQ(0, ops_in_flight_);
   scoped_refptr<CommitTasks> scoped_this(this);
   CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
                                  tsm = this->txn_status_manager_,
-                                 txn_id = this->txn_id_, commit_timestamp] {
+                                 txn_id = this->txn_id_] {
     MAYBE_INJECT_RANDOM_LATENCY(
         FLAGS_txn_status_manager_inject_latency_finalize_commit_ms);
 
-    if (IsShuttingDownCleanup()) {
+    if (stop_task_ || tsm->shutting_down()) {
+      tsm->RemoveCommitTask(txn_id, this);
       return;
     }
     TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
     if (PREDICT_TRUE(l.first_failed_status().ok())) {
       TabletServerErrorPB error_pb;
-      WARN_NOT_OK(tsm->FinalizeCommitTransaction(txn_id.value(), commit_timestamp, &error_pb),
+      WARN_NOT_OK(tsm->CompleteCommitTransaction(txn_id.value()),
                   "Error writing to transaction status table");
     }
 
@@ -419,6 +482,53 @@ void CommitTasks::ScheduleFinalizeCommitWrite(Timestamp commit_timestamp) {
   }));
 }
 
+void CommitTasks::ScheduleFinalizeCommitWrite() {
+  // Submit the task to a threadpool.
+  // NOTE: This is called by the reactor thread that catches the BeginCommit
+  // response, so we can't do IO in this thread.
+  DCHECK_EQ(0, ops_in_flight_);
+  scoped_refptr<CommitTasks> scoped_this(this);
+  CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+                                 tsm = this->txn_status_manager_,
+                                 txn_id = this->txn_id_] {
+    MAYBE_INJECT_RANDOM_LATENCY(
+        FLAGS_txn_status_manager_inject_latency_finalize_commit_ms);
+
+    if (IsShuttingDownCleanup()) {
+      return;
+    }
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+    // TODO(awong): the race handling here specific to concurrent aborts is
+    // messy. Consider a less ad-hoc way to define on-going tasks.
+    // NOTE: the special handling of aborts is only critical in this scheduling
+    // to finalize the commit because the only non-OPEN state an abort can
+    // occur with is COMMIT_IN_PROGRESS, which is the expected state of the
+    // transaction when this is called.
+    if (PREDICT_TRUE(l.first_failed_status().ok())) {
+      // It's possible that a user called BeginAbortTransaction while we were
+      // waiting, so before attempting to commit, with the leader lock held,
+      // check if that's the case.
+      if (ScheduleAbortIfNecessary()) {
+        return;
+      }
+      TabletServerErrorPB error_pb;
+      Status s = tsm->FinalizeCommitTransaction(txn_id.value(), commit_timestamp_, &error_pb);
+      if (PREDICT_TRUE(s.ok())) {
+        return;
+      }
+      // It's again possible the FinalizeCommitTransaction call raced with
+      // BeginAbortTransaction, resulting in an aborted transaction. If so,
+      // schedule an abort.
+      if (ScheduleAbortIfNecessary()) {
+        return;
+      }
+      LOG(WARNING) << Substitute("Error writing to transaction status table: $0",
+                                 s.ToString());
+    }
+    tsm->RemoveCommitTask(txn_id.value(), this);
+  }));
+}
+
 TxnStatusManagerBuildingVisitor::TxnStatusManagerBuildingVisitor()
     : highest_txn_id_(kIdStatusDataReady) {
 }
@@ -560,23 +670,37 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
                 "Unable to initialize TxnSystemClient");
   }
 
-  unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
   unordered_map<int64_t, scoped_refptr<CommitTasks>> new_commits;
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> new_finalizes;
   unordered_map<int64_t, scoped_refptr<CommitTasks>> new_aborts;
   if (txn_client) {
     for (const auto& [txn_id, txn_entry] : txns_by_id) {
       const auto& state = txn_entry->state();
-      if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
-        new_commits.emplace(txn_id,
-            new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
-                            txn_client, commit_pool_, this));
-      } else if (state == TxnStatePB::ABORT_IN_PROGRESS) {
-        new_aborts.emplace(txn_id,
-            new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
-                            txn_client, commit_pool_, this));
+      switch (state) {
+        case TxnStatePB::COMMIT_IN_PROGRESS:
+          new_commits.emplace(txn_id,
+              new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+                              txn_client, commit_pool_, this));
+          break;
+        case TxnStatePB::FINALIZE_IN_PROGRESS: {
+          scoped_refptr<CommitTasks> tasks(
+              new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+                              txn_client, commit_pool_, this));
+          tasks->set_commit_timestamp(Timestamp(txn_entry->commit_timestamp()));
+          new_finalizes.emplace(txn_id, std::move(tasks));
+          break;
+        }
+        case TxnStatePB::ABORT_IN_PROGRESS:
+          new_aborts.emplace(txn_id,
+              new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+                              txn_client, commit_pool_, this));
+          break;
+        default:
+          break;
       }
     }
   }
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
   {
     std::lock_guard<simple_spinlock> l(lock_);
     highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
@@ -592,6 +716,12 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
         tasks->BeginCommitAsync();
       }
     }
+    if (!new_finalizes.empty()) {
+      LOG(INFO) << Substitute("Starting $0 finalize tasks", new_finalizes.size());
+      for (const auto& [_, tasks] : new_finalizes) {
+        tasks->FinalizeCommitAsync();
+      }
+    }
     if (!new_aborts.empty()) {
       LOG(INFO) << Substitute("Starting $0 aborts task", new_aborts.size());
       for (const auto& [_, tasks] : new_aborts) {
@@ -883,9 +1013,9 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
 
 void CommitTasks::BeginCommitAsync() {
   if (participant_ids_.empty()) {
-    // If there are no participants for this transaction; just write an invalid
-    // timestamp.
-    ScheduleFinalizeCommitWrite(Timestamp::kInvalidTimestamp);
+    // If there are no participants for this transaction; just change its state
+    // to committed.
+    ScheduleCompleteCommitWrite();
   } else {
     // If there are some participants, schedule beginning commit tasks so
     // we can determine a finalized commit timestamp.
@@ -914,6 +1044,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& us
   const auto& pb = txn_lock.data().pb;
   const auto& state = pb.state();
   if (state == TxnStatePB::COMMIT_IN_PROGRESS ||
+      state == TxnStatePB::FINALIZE_IN_PROGRESS ||
       state == TxnStatePB::COMMITTED) {
     return Status::OK();
   }
@@ -953,7 +1084,8 @@ Status TxnStatusManager::FinalizeCommitTransaction(
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
   const auto& state = pb.state();
-  if (state == TxnStatePB::COMMITTED) {
+  if (state == TxnStatePB::FINALIZE_IN_PROGRESS ||
+      state == TxnStatePB::COMMITTED) {
     return Status::OK();
   }
   if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS)) {
@@ -963,9 +1095,50 @@ Status TxnStatusManager::FinalizeCommitTransaction(
         ts_error);
   }
   auto* mutable_data = txn_lock.mutable_data();
-  mutable_data->pb.set_state(TxnStatePB::COMMITTED);
+  mutable_data->pb.set_state(TxnStatePB::FINALIZE_IN_PROGRESS);
+  mutable_data->pb.set_commit_timestamp(commit_timestamp.value());
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(
       txn_id, mutable_data->pb, ts_error));
+
+  if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    auto& task = FindOrDie(commits_in_flight_, txn_id);
+    task->FinalizeCommitAsync();
+  }
+
+  txn_lock.Commit();
+  return Status::OK();
+}
+
+Status TxnStatusManager::CompleteCommitTransaction(int64_t txn_id) {
+  leader_lock_.AssertAcquiredForReading();
+  scoped_refptr<TransactionEntry> txn;
+  TabletServerErrorPB ts_error;
+  RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn, &ts_error));
+
+  TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
+  const auto& pb = txn_lock.data().pb;
+  const auto& state = pb.state();
+  if (state == TxnStatePB::COMMITTED) {
+    return Status::OK();
+  }
+  if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS &&
+                    state != TxnStatePB::FINALIZE_IN_PROGRESS)) {
+    return ReportIllegalTxnState(
+        Substitute("transaction ID $0 is not finalizing its commit: $1",
+                   txn_id, SecureShortDebugString(pb)),
+        &ts_error);
+  }
+  auto* mutable_data = txn_lock.mutable_data();
+  mutable_data->pb.set_state(TxnStatePB::COMMITTED);
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(
+      txn_id, mutable_data->pb, &ts_error));
+
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    commits_in_flight_.erase(txn_id);
+  }
+
   txn_lock.Commit();
   return Status::OK();
 }
@@ -995,9 +1168,9 @@ Status TxnStatusManager::FinalizeAbortTransaction(int64_t txn_id) {
   return Status::OK();
 }
 
-Status TxnStatusManager::AbortTransaction(int64_t txn_id,
-                                          const string& user,
-                                          TabletServerErrorPB* ts_error) {
+Status TxnStatusManager::BeginAbortTransaction(int64_t txn_id,
+                                               const boost::optional<string>& user,
+                                               TabletServerErrorPB* ts_error) {
 
   leader_lock_.AssertAcquiredForReading();
   TxnSystemClient* txn_client;
@@ -1010,12 +1183,28 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
   const auto& state = pb.state();
-  if (state == TxnStatePB::ABORTED ||
-      state == TxnStatePB::ABORT_IN_PROGRESS) {
+  if (state == TxnStatePB::ABORT_IN_PROGRESS) {
+    if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+      // It's possible we don't have any tasks running even though we're in
+      // ABORT_IN_PROGRESS, e.g. if we're in the middle of aborting a commit
+      // (and have removed the commit tasks), while at the same time, we've
+      // just served a client-initiated abort and so the state is already
+      // ABORT_IN_PROGRESS. If so, we should start abort tasks.
+      std::unique_lock<simple_spinlock> l(lock_);
+      if (PREDICT_FALSE(!ContainsKey(commits_in_flight_, txn_id))) {
+        auto participant_ids = txn->GetParticipantIds();
+        auto tasks = EmplaceOrDie(&commits_in_flight_, txn_id,
+          new CommitTasks(txn_id, std::move(participant_ids),
+                          txn_client, commit_pool_, this));
+        l.unlock();
+        tasks->AbortTxnAsync();
+      }
+    }
+    return Status::OK();
+  }
+  if (state == TxnStatePB::ABORTED) {
     return Status::OK();
   }
-  // TODO(awong): if we're in COMMIT_IN_PROGRESS, we should attempt to abort
-  // any in-flight commit tasks.
   if (PREDICT_FALSE(state != TxnStatePB::OPEN &&
                     state != TxnStatePB::COMMIT_IN_PROGRESS)) {
     return ReportIllegalTxnState(
@@ -1035,13 +1224,24 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
                         txn_client, commit_pool_, this));
     l.unlock();
     if (emplaced) {
+      // If we didn't have commit tasks on-going, finalize the abort on
+      // participants.
       map_iter->second->AbortTxnAsync();
+    } else {
+      // If we did have commit tasks, set them up so they finalize the abort.
+      map_iter->second->SetNeedsFinalizeAbort();
     }
   }
   txn_lock.Commit();
   return Status::OK();
 }
 
+Status TxnStatusManager::AbortTransaction(int64_t txn_id,
+                                          const string& user,
+                                          TabletServerErrorPB* ts_error) {
+  return BeginAbortTransaction(txn_id, user, ts_error);
+}
+
 Status TxnStatusManager::GetTransactionStatus(
     int64_t txn_id,
     const string& user,
@@ -1058,6 +1258,9 @@ Status TxnStatusManager::GetTransactionStatus(
   txn_status->set_user(pb.user());
   DCHECK(pb.has_state());
   txn_status->set_state(pb.state());
+  if (pb.has_commit_timestamp()) {
+    txn_status->set_commit_timestamp(pb.commit_timestamp());
+  }
 
   return Status::OK();
 }
@@ -1076,22 +1279,22 @@ Status TxnStatusManager::KeepTransactionAlive(int64_t txn_id,
 
   const auto& pb = txn_lock.data().pb;
   const auto& state = pb.state();
-  if (state != TxnStatePB::OPEN &&
-      state != TxnStatePB::COMMIT_IN_PROGRESS) {
-    return ReportIllegalTxnState(
-        Substitute("transaction ID $0 is already in terminal state: $1",
-                   txn_id, SecureShortDebugString(pb)),
-        ts_error);
-  }
   // Keepalive updates are not required for a transaction in COMMIT_IN_PROGRESS
   // state. The system takes care of a transaction once the client side
   // initiates the commit phase.
-  if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
+  if (state == TxnStatePB::COMMIT_IN_PROGRESS ||
+      state == TxnStatePB::FINALIZE_IN_PROGRESS) {
     return ReportIllegalTxnState(
         Substitute("transaction ID $0 is in commit phase: $1",
                    txn_id, SecureShortDebugString(pb)),
         ts_error);
   }
+  if (state != TxnStatePB::OPEN) {
+    return ReportIllegalTxnState(
+        Substitute("transaction ID $0 is not available for further commits: $1",
+                   txn_id, SecureShortDebugString(pb)),
+        ts_error);
+  }
   DCHECK_EQ(TxnStatePB::OPEN, state);
   txn->SetLastHeartbeatTime(MonoTime::Now());
 
diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h
index 48c4d87..17a2521 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/common/timestamp.h"
@@ -33,6 +34,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/transactions/txn_status_entry.h"
 #include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/util/locks.h"
@@ -40,9 +42,9 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+class MonoDelta;
 class ThreadPool;
 
-class MonoDelta;
 namespace rpc {
 class RpcContext;
 }  // namespace rpc
@@ -56,12 +58,25 @@ class TabletServerErrorPB;
 } // namespace tserver
 
 namespace transactions {
-class TxnStatusEntryPB;
 class TxnStatusManager;
 class TxnSystemClient;
 class TxnSystemClientInitializer;
 
-// A handle for background tasks associated with a single transaction.
+// A handle for background tasks associated with a single transaction. The
+// following state changes are expected:
+//
+//     BeginCommit           FinalizeCommit        CompleteCommit
+//   OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
+//
+//     BeginCommit           BeginAbort            FinalizeAbort
+//   OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
+//
+//     AbortTxn              FinalizeAbort
+//   OPEN --> ABORT_IN_PROGRESS --> ABORTED
+//
+// In each of these sequences, only the first state change is initiated by
+// clients -- subsequent changes are scheduled automatically by the reactor
+// threads.
 class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
  public:
   CommitTasks(TxnId txn_id,
@@ -76,6 +91,8 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
        txn_status_manager_(txn_status_manager),
        ops_in_flight_(participant_ids_.size()),
        begin_commit_timestamps_(participant_ids_.size(), Timestamp::kInvalidTimestamp),
+       commit_timestamp_(Timestamp::kInitialTimestamp),
+       abort_txn_(TxnStatePB::UNKNOWN),
        stop_task_(false) {
   }
 
@@ -94,12 +111,12 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
   // Asynchronously sends a FINALIZE_COMMIT participant op to the each
   // participant in the transaction, and upon completion, schedules a commit
   // record to be written to the tablet.
-  void FinalizeCommitAsyncTask(int participant_idx, const Timestamp& commit_timestamp);
+  void FinalizeCommitAsyncTask(int participant_idx);
 
   // Asynchronously sends a FINALIZE_COMMIT participant op to the participant
   // at the given index. If this was the last one to complete, schedules a
-  // commit record with the given timestamp to be written to the tablet.
-  void FinalizeCommitAsync(Timestamp commit_timestamp);
+  // commit record with 'commit_timestamp_' to be written to the tablet.
+  void FinalizeCommitAsync();
 
   // Asynchronously sends a ABORT_TXN participant op to each participant in the
   // transaction, and upon completion, schedules an abort record to be written
@@ -114,8 +131,10 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
   // Schedule calls to the TxnStatusManager to be made on the commit pool.
   // NOTE: these may be called on reactor threads and thus must not
   // synchronously do any IO.
-  void ScheduleFinalizeCommitWrite(Timestamp commit_timestamp);
-  void ScheduleAbortTxnWrite();
+  void ScheduleFinalizeCommitWrite();
+  void ScheduleCompleteCommitWrite();
+  void ScheduleBeginAbortTxnWrite();
+  void ScheduleFinalizeAbortTxnWrite();
 
   // Stops further tasks from being run. Once called calls to the above methods
   // should effectively no-op.
@@ -123,6 +142,36 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
     stop_task_ = true;
   }
 
+  // Indicates to the on-going tasks that these tasks should focus on changing
+  // the state to ABORT_IN_PROGRESS and driving the rest of the abort.
+  //
+  // NOTE: this can be called from multiple tasks, and thus, abort_txn_ may
+  // already be set ABORT_IN_PROGRESS, or even ABORTED if there has been a
+  // client-initiated call to BeginAbortTransaction() while this commit was
+  // already in progress. In the latter case, this should no-op, since the
+  // transaction's state has already been set to ABORT_IN_PROGRESS.
+  void SetNeedsBeginAbort() {
+    auto expected_unknown_state = TxnStatePB::UNKNOWN;
+    abort_txn_.compare_exchange_strong(expected_unknown_state, TxnStatePB::ABORT_IN_PROGRESS);
+  }
+
+  // Indicates to the on-going tasks that these tasks should focus on sending
+  // out ABORT_TXN ops and changing the transaction state to ABORTED. Expected
+  // to be run after a ABORT_IN_PROGRESS record has been persisted to disk.
+  //
+  // NOTE: this can be called while the 'abort_txn_' is already
+  // ABORT_IN_PROGRESS if we're racing with a botched commit.
+  void SetNeedsFinalizeAbort() {
+    abort_txn_ = TxnStatePB::ABORTED;
+  }
+
+  // Sets the timestamp that this commit task should finalize across
+  // transaction participants.
+  void set_commit_timestamp(Timestamp commit_timestamp) {
+    DCHECK_EQ(Timestamp::kInitialTimestamp, commit_timestamp_);
+    commit_timestamp_ = commit_timestamp;
+  }
+
  private:
   friend class RefCountedThreadSafe<CommitTasks>;
   ~CommitTasks() = default;
@@ -144,6 +193,10 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
   // shutdown.
   bool IsShuttingDownCleanup() const;
 
+  // Returns true if the transaction has been aborted, and schedules abort
+  // mechanics to proceed.
+  bool ScheduleAbortIfNecessary();
+
   // The ID of the transaction being committed.
   const TxnId txn_id_;
 
@@ -168,6 +221,36 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
   // The commit timestamp for this transaction.
   Timestamp commit_timestamp_;
 
+  // Whether an on-going commit should abort instead.
+  // If ABORT_IN_PROGRESS, an on-going commit should call
+  // BeginAbortTransaction, indicating the commit failed an needs to be cleaned
+  // up. If ABORTED, an on-going commit should call FinalizeAbortTransaction,
+  // indicating the commit was interrupted by the user, and an
+  // ABORT_IN_PROGRESS record has already been written to the table.
+  //
+  // The following state changes are expected for 'abort_txn_':
+  //         SetNeedsBeginAbort
+  // UNKNOWN --> ABORT_IN_PROGRESS
+  // - The commit tasks fail (e.g. because a participant was deleted) and one
+  //   of the tasks calls SetNeedsBeginAbort(). The commit task should
+  //   transition to writing an ABORT_IN_PROGRESS record and then driving
+  //   aborts on participants.
+  //
+  //         SetNeedsBeginAbort    SetNeedsFinalizeAbort
+  // UNKNOWN --> ABORT_IN_PROGRESS --> ABORTED
+  // - The commit tasks fail and one of the tasks calls SetNeedsBeginAbort().
+  //   Concurrently, a user calls AbortTransaction(), persisting a
+  //   ABORT_IN_PROGRESS record and then calling SetNeedsFinalizeAbort() on the
+  //   in-flight commit task. The tasks should then focus on driving aborts on
+  //   participants.
+  //
+  //         SetNeedsFinalizeAbort
+  // UNKNOWN --> ABORTED
+  // - A user calls AbortTransaction() without having attempting to abort
+  //   otherwise. This will have written an ABORT_IN_PROGRESS record, and the
+  //   tasks should focus on driving aborts on participants.
+  std::atomic<TxnStatePB> abort_txn_;
+
   // Whether the task should stop executing, e.g. since an IllegalState error
   // was observed on a participant, or because the TxnStatusManager changed
   // leadership.
@@ -288,12 +371,23 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
   Status FinalizeCommitTransaction(int64_t txn_id, Timestamp commit_timestamp,
                                    tserver::TabletServerErrorPB* ts_error) override;
 
+  // Updates the state of the transaction to COMMITTED, returning an error if
+  // the transaction isn't in an appropriate state.
+  Status CompleteCommitTransaction(int64_t txn_id);
+
   // Begins aborting the given transaction, returning an error if the
   // transaction doesn't exist, is committed or not yet opened, or isn't owned
   // by the given user.
   Status AbortTransaction(int64_t txn_id, const std::string& user,
                           tserver::TabletServerErrorPB* ts_error) override;
 
+  // Sets the state to ABORT_IN_PROGRESS and asynchronously sends ABORT_TXN ops
+  // to each participant in the transaction. This may not be directly called by
+  // a user -- as such, the 'user' field is optional.
+  Status BeginAbortTransaction(int64_t txn_id,
+                               const boost::optional<std::string>& user,
+                               tserver::TabletServerErrorPB* ts_error);
+
   // Writes a record to the TxnStatusManager indicating the given transaction
   // has been successfully aborted.
   Status FinalizeAbortTransaction(int64_t txn_id);