You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/12/14 15:05:38 UTC

[kudu] 01/03: KUDU-2612: allow participant ops to detect idempotent calls

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a05254904eff894261134674c246a6311c0adce4
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Dec 7 20:51:31 2020 -0800

    KUDU-2612: allow participant ops to detect idempotent calls
    
    Previously, we could return TXN_ILLEGAL_STATE error if we tried to start
    a transaction but one was already present, we tried to begin committing
    but it the commit was already in progress, etc. However, it seems like
    it will be more robust behavior to be able to detect such cases, where
    the requested mutation is already applied. This patch does this by
    sending back a new TXN_OP_ALREADY_APPLIED error code for such calls.
    
    While it would be nice to return an OK response with the participant op,
    it seems non-trivial to exit out of an in-flight op with a success.
    
    Change-Id: If82d3d1aad6737dd4f9f234b122b8c15d65a6604
    Reviewed-on: http://gerrit.cloudera.org:8080/16860
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/txn_participant-itest.cc     |  79 ++++++------
 src/kudu/tablet/ops/participant_op.cc              |  11 +-
 src/kudu/tablet/ops/write_op.cc                    |   4 +-
 src/kudu/tablet/ops/write_op.h                     |   5 +-
 src/kudu/tablet/tablet_metadata.cc                 |  20 ++-
 src/kudu/tablet/tablet_metadata.h                  |   6 +-
 src/kudu/tablet/txn_participant-test.cc            |  50 ++++----
 src/kudu/tablet/txn_participant.cc                 |  30 +++--
 src/kudu/tablet/txn_participant.h                  | 142 +++++++++++++--------
 src/kudu/tserver/tserver.proto                     |   3 +
 10 files changed, 217 insertions(+), 133 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index b962571..63f884a 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -87,8 +87,13 @@ using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::tablet::kCommitSequence;
 using kudu::tablet::kDummyCommitTimestamp;
+using kudu::tablet::kCommitted;
+using kudu::tablet::kCommitInProgress;
+using kudu::tablet::kInitializing;
+using kudu::tablet::kOpen;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::Txn;
+using kudu::tablet::TxnState;
 using kudu::tablet::TxnParticipant;
 using kudu::tserver::ParticipantOpPB;
 using kudu::tserver::ParticipantRequestPB;
@@ -132,9 +137,13 @@ Status ParticipateInTransaction(TabletServerAdminServiceProxy* admin_proxy,
 Status ParticipateInTransactionCheckResp(TabletServerAdminServiceProxy* admin_proxy,
                                          const string& tablet_id, int64_t txn_id,
                                          ParticipantOpPB::ParticipantOpType type,
-                                         TabletServerErrorPB::Code* code = nullptr) {
+                                         TabletServerErrorPB::Code* code = nullptr,
+                                         Timestamp* begin_commit_ts = nullptr) {
   ParticipantResponsePB resp;
   RETURN_NOT_OK(ParticipateInTransaction(admin_proxy, tablet_id, txn_id, type, &resp));
+  if (begin_commit_ts) {
+    *begin_commit_ts = Timestamp(resp.timestamp());
+  }
   if (resp.has_error()) {
     if (code) {
       *code = resp.error().code();
@@ -170,7 +179,7 @@ string TxnsAsString(const vector<TxnParticipant::TxnEntry>& txns) {
   return JoinMapped(txns,
       [](const TxnParticipant::TxnEntry& txn) {
         return Substitute("(txn_id=$0: $1, $2)",
-            txn.txn_id, Txn::StateToString(txn.state), txn.commit_timestamp);
+            txn.txn_id, TxnStateToString(txn.state), txn.commit_timestamp);
       },
       ",");
 }
@@ -344,7 +353,7 @@ TEST_P(ParticipantCopyITest, TestCopyParticipantOps) {
       }
     }
     expected_txns.emplace_back(
-        TxnParticipant::TxnEntry({ i, Txn::kCommitted, kDummyCommitTimestamp }));
+        TxnParticipant::TxnEntry({ i, kCommitted, kDummyCommitTimestamp }));
   }
   for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
     ASSERT_EVENTUALLY([&] {
@@ -527,15 +536,12 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code));
     ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
   }
-  // TODO(awong): make repeated, idempotent return OK instead of an error? Or
-  // return a different error code so callers can distinguish between a benign
-  // error.
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     Status s = ParticipateInTransactionCheckResp(
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
   }
 
   // We can't finalize the commit without beginning to commit first.
@@ -548,18 +554,24 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
   }
 
   // Start commititng and ensure we can't start another transaction.
+  Timestamp begin_commit_ts;
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     ASSERT_OK(ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT, &code));
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+        &code, &begin_commit_ts));
     ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts);
   }
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Timestamp refetched_begin_commit_ts;
     Status s = ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT, &code);
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+        &code, &refetched_begin_commit_ts);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
+    ASSERT_EQ(begin_commit_ts, refetched_begin_commit_ts);
   }
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -581,26 +593,14 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
     Status s = ParticipateInTransactionCheckResp(
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
-  }
-  {
-    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
-    Status s = ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code);
-    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
   }
-  {
+  for (auto type : { ParticipantOpPB::BEGIN_TXN,
+                     ParticipantOpPB::BEGIN_COMMIT,
+                     ParticipantOpPB::ABORT_TXN }) {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     Status s = ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT, &code);
-    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
-  }
-  {
-    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
-    Status s = ParticipateInTransactionCheckResp(
-        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN, &code);
+        admin_proxy.get(), tablet_id, kTxnId, type, &code);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
   }
@@ -617,13 +617,12 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInAbortSequence) {
   const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
   ASSERT_OK(ParticipateInTransactionCheckResp(
       admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN));
-  // TODO(awong): make repeated, idempotent return OK instead of an error.
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
     Status s = ParticipateInTransactionCheckResp(
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
   }
   {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -640,8 +639,14 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInAbortSequence) {
         admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN, &code));
     ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
   }
-  for (auto type : { ParticipantOpPB::ABORT_TXN,
-                     ParticipantOpPB::FINALIZE_COMMIT,
+  {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Status s = ParticipateInTransactionCheckResp(
+        admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN, &code);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
+  }
+  for (auto type : { ParticipantOpPB::FINALIZE_COMMIT,
                      ParticipantOpPB::BEGIN_TXN,
                      ParticipantOpPB::BEGIN_COMMIT}) {
     TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -838,23 +843,23 @@ TEST_F(TxnParticipantElectionStormITest, TestFrequentElections) {
     if (last_successful_op == ParticipantOpPB::UNKNOWN) {
       continue;
     }
-    Txn::State expected_state;
+    TxnState expected_state;
     switch (last_successful_op) {
       case ParticipantOpPB::BEGIN_TXN:
-        expected_state = Txn::kOpen;
+        expected_state = kOpen;
         break;
       case ParticipantOpPB::BEGIN_COMMIT:
-        expected_state = Txn::kCommitInProgress;
+        expected_state = kCommitInProgress;
         break;
       case ParticipantOpPB::FINALIZE_COMMIT:
         expected_rows += num_successful_writes_per_txn[txn_id];
-        expected_state = Txn::kCommitted;
+        expected_state = kCommitted;
         break;
       default:
         FAIL() << "Unexpected successful op " << last_successful_op;
     }
     expected_txns.emplace_back(TxnParticipant::TxnEntry({
-        txn_id, expected_state, expected_state == Txn::kCommitted ? kDummyCommitTimestamp : -1}));
+        txn_id, expected_state, expected_state == kCommitted ? kDummyCommitTimestamp : -1}));
   }
   for (int i = 0; i < replicas.size(); i++) {
     // NOTE: We ASSERT_EVENTUALLY here because having completed the participant
@@ -891,7 +896,7 @@ TEST_F(TxnParticipantElectionStormITest, TestFrequentElections) {
     // successfully initialized.
     vector<TxnParticipant::TxnEntry> actual_txns_not_initting;
     for (const auto& txn : actual_txns) {
-      if (txn.state != Txn::kInitializing) {
+      if (txn.state != kInitializing) {
         actual_txns_not_initting.emplace_back(txn);
       }
     }
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index 621f24e..4764b41 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/ops/participant_op.h"
 
 #include <memory>
+#include <ostream>
 
 #include <glog/logging.h>
 #include <google/protobuf/arena.h>
@@ -102,9 +103,15 @@ Status ParticipantOpState::ValidateOp() {
     case ParticipantOpPB::BEGIN_TXN:
       s = txn_->ValidateBeginTransaction(&code);
       break;
-    case ParticipantOpPB::BEGIN_COMMIT:
-      s = txn_->ValidateBeginCommit(&code);
+    case ParticipantOpPB::BEGIN_COMMIT: {
+      Timestamp begin_commit_ts;
+      s = txn_->ValidateBeginCommit(&code, &begin_commit_ts);
+      if (PREDICT_FALSE(begin_commit_ts != Timestamp::kInvalidTimestamp)) {
+        DCHECK(s.IsIllegalState()) << s.ToString();
+        response_->set_timestamp(begin_commit_ts.value());
+      }
       break;
+    }
     case ParticipantOpPB::FINALIZE_COMMIT:
       s = txn_->ValidateFinalize(&code);
       break;
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index e62e3cf..7f8bc92 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -380,9 +380,9 @@ Status WriteOpState::AcquireTxnLockCheckOpen(scoped_refptr<Txn> txn) {
   shared_lock<rw_semaphore> temp;
   txn->AcquireReadLock(&temp);
   const auto txn_state = txn->state();
-  if (PREDICT_FALSE(txn_state != Txn::kOpen)) {
+  if (PREDICT_FALSE(txn_state != kOpen)) {
     return Status::InvalidArgument(Substitute("txn $0 is not open: $1",
-        txn->txn_id(), Txn::StateToString(txn_state)));
+        txn->txn_id(), TxnStateToString(txn_state)));
   }
   txn_lock_.swap(temp);
   txn_ = std::move(txn);
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index 3616b18..b1b1c9c 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -26,6 +26,7 @@
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
+#include <google/protobuf/stubs/port.h>
 
 #include "kudu/common/row_operations.h"
 #include "kudu/common/wire_protocol.pb.h"
@@ -54,11 +55,11 @@ namespace tablet {
 
 class ScopedOp;
 class TabletReplica;
-class Txn;
 class TxResultPB;
-struct TxnRowSets;
+class Txn;
 struct RowOp;
 struct TabletComponents;
+struct TxnRowSets;
 
 // Privileges required for write operations.
 enum WritePrivilegeType {
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 579fb1b..ddd15a2 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -27,7 +27,6 @@
 #include <utility>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <gflags/gflags.h>
 #include <google/protobuf/stubs/port.h>
 
@@ -50,6 +49,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/rowset_metadata.h"
 #include "kudu/tablet/txn_metadata.h"
+#include "kudu/tablet/txn_participant.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
@@ -850,9 +850,23 @@ void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnch
   anchors_needing_flush_.emplace_back(std::move(log_anchor));
 }
 
-bool TabletMetadata::HasTxnMetadata(int64_t txn_id) {
+bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state) {
   std::lock_guard<LockType> l(data_lock_);
-  return ContainsKey(txn_metadata_by_txn_id_, txn_id);
+  auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
+  if (txn_meta) {
+    if (!state) return true;
+    if (txn_meta->commit_timestamp()) {
+      *state = kCommitted;
+    } else if (txn_meta->aborted()) {
+      *state = kAborted;
+    } else if (txn_meta->commit_mvcc_op_timestamp()) {
+      *state = kCommitInProgress;
+    } else {
+      *state = kOpen;
+    }
+    return true;
+  }
+  return false;
 }
 
 void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids,
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index 25c96fe..e4ea591 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -58,6 +58,7 @@ namespace tablet {
 
 class RowSetMetadata;
 class TxnMetadata;
+enum TxnState : int8_t;
 
 typedef std::vector<std::shared_ptr<RowSetMetadata> > RowSetMetadataVector;
 typedef std::unordered_set<int64_t> RowSetMetadataIds;
@@ -270,8 +271,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   // associated with it.
   void AbortTransaction(int64_t txn_id, std::unique_ptr<log::MinLogIndexAnchorer> log_anchor);
 
-  // Returns whether a given transaction has metadata.
-  bool HasTxnMetadata(int64_t txn_id);
+  // Returns whether a given transaction has metadata, and if requested, what
+  // state the transaction is in.
+  bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr);
 
   // Returns the transaction IDs that were persisted as being in-flight,
   // terminal (committed or aborted), and having un-flushed MRSs.
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index 3e24d07..5507892 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -239,9 +239,9 @@ TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
       ParticipantOpPB::ABORT_TXN,
   }, 2));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { 0, Txn::kCommitted, kDummyCommitTimestamp },
-      { 1, Txn::kAborted, -1 },
-      { 2, Txn::kAborted, -1 },
+      { 0, kCommitted, kDummyCommitTimestamp },
+      { 1, kAborted, -1 },
+      { 2, kAborted, -1 },
   }), txn_participant()->GetTxnsForTests());
 }
 
@@ -296,14 +296,14 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
   NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId));
   NO_FATALS(check_bad_ops({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kOpen, -1 },
+      { kTxnId, kOpen, -1 },
   }), txn_participant()->GetTxnsForTests());
 
   // Once we begin committing, we can't start the transaction again.
   NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId));
   NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitInProgress, -1 },
+      { kTxnId, kCommitInProgress, -1 },
   }), txn_participant()->GetTxnsForTests());
 
   // Once we've begun finalizing, we can't do anything.
@@ -312,7 +312,7 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
                             ParticipantOpPB::BEGIN_COMMIT,
                             ParticipantOpPB::ABORT_TXN }, kTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
+      { kTxnId, kCommitted, kDummyCommitTimestamp },
   }), txn_participant()->GetTxnsForTests());
 
   // Once we've aborted, we can't do anything.
@@ -323,8 +323,8 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
                             ParticipantOpPB::BEGIN_COMMIT,
                             ParticipantOpPB::FINALIZE_COMMIT }, kAbortedTxnId));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
-      { kAbortedTxnId, Txn::kAborted, -1 },
+      { kTxnId, kCommitted, kDummyCommitTimestamp },
+      { kAbortedTxnId, kAborted, -1 },
   }), txn_participant()->GetTxnsForTests());
 }
 
@@ -352,7 +352,7 @@ TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
   }
   const auto& txns = txn_participant()->GetTxnsForTests();
   for (int i = 0; i < kNumTxns; i++) {
-    ASSERT_EQ(TxnParticipant::TxnEntry({ i, Txn::kCommitted, kDummyCommitTimestamp }), txns[i]);
+    ASSERT_EQ(TxnParticipant::TxnEntry({ i, kCommitted, kDummyCommitTimestamp }), txns[i]);
   }
 }
 
@@ -391,7 +391,7 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   // not have been able to abort.
   if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-        { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
+        { kTxnId, kCommitted, kDummyCommitTimestamp },
     }), txn_participant()->GetTxnsForTests());
     ASSERT_OK(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::BEGIN_COMMIT)]);
     ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::ABORT_TXN)].ok());
@@ -399,7 +399,7 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   // If we aborted the commit, we could not have finalized the commit.
   } else if (status_for_op(ParticipantOpPB::ABORT_TXN).ok()) {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-        { kTxnId, Txn::kAborted, -1 },
+        { kTxnId, kAborted, -1 },
     }), txn_participant()->GetTxnsForTests());
     ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::FINALIZE_COMMIT)].ok());
 
@@ -407,14 +407,14 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   // left with the commit in progress.
   } else if (status_for_op(ParticipantOpPB::BEGIN_COMMIT).ok()) {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-        { kTxnId, Txn::kCommitInProgress, -1 },
+        { kTxnId, kCommitInProgress, -1 },
     }), txn_participant()->GetTxnsForTests());
 
   // Finally, if nothing else succeeded, at least we should have been able to
   // start the transaction.
   } else {
     ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-        { kTxnId, Txn::kOpen, -1 },
+        { kTxnId, kOpen, -1 },
     }), txn_participant()->GetTxnsForTests());
   }
 }
@@ -429,11 +429,11 @@ TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
     ASSERT_TRUE(resp.has_timestamp());
   }
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+      { kTxnId, kCommitted, kDummyCommitTimestamp }
   }), txn_participant()->GetTxnsForTests());
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+      { kTxnId, kCommitted, kDummyCommitTimestamp }
   }), txn_participant()->GetTxnsForTests());
 }
 
@@ -532,7 +532,7 @@ TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
   ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitInProgress, -1 }
+      { kTxnId, kCommitInProgress, -1 }
   }), txn_participant()->GetTxnsForTests());
 
   // Once we finalize the commit, the BEGIN_COMMIT anchor should be released.
@@ -558,7 +558,7 @@ TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
   // Ensure the transaction bootstraps to the expected state.
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+      { kTxnId, kCommitted, kDummyCommitTimestamp }
   }), txn_participant()->GetTxnsForTests());
 }
 
@@ -661,7 +661,7 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
 
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kOpen, -1 }
+      { kTxnId, kOpen, -1 }
   }), txn_participant()->GetTxnsForTests());
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
                                        kDummyCommitTimestamp));
@@ -671,7 +671,7 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
 
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitInProgress, -1 }
+      { kTxnId, kCommitInProgress, -1 }
   }), txn_participant()->GetTxnsForTests());
   ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
                                        kDummyCommitTimestamp));
@@ -681,7 +681,7 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
 
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+      { kTxnId, kCommitted, kDummyCommitTimestamp }
   }), txn_participant()->GetTxnsForTests());
 
   // Now perform the same validation but for a transaction that gets aborted.
@@ -693,8 +693,8 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
   }
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
-      { kAbortedTxnId, Txn::kOpen, -1 }
+      { kTxnId, kCommitted, kDummyCommitTimestamp },
+      { kAbortedTxnId, kOpen, -1 }
   }), txn_participant()->GetTxnsForTests());
   ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN,
                                        kDummyCommitTimestamp));
@@ -703,8 +703,8 @@ TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
   }
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
-      { kAbortedTxnId, Txn::kAborted, -1 }
+      { kTxnId, kCommitted, kDummyCommitTimestamp },
+      { kAbortedTxnId, kAborted, -1 }
   }), txn_participant()->GetTxnsForTests());
 }
 
@@ -842,7 +842,7 @@ TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
   // As a sanity check, ensure we get to the expected state if we reboot.
   ASSERT_OK(RestartReplica(/*reset_tablet*/true));
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
-      { kTxnId, Txn::kOpen, -1 }
+      { kTxnId, kOpen, -1 }
   }), txn_participant()->GetTxnsForTests());
 }
 
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index 9e86797..5cbef6f 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -39,6 +39,18 @@ using strings::Substitute;
 namespace kudu {
 namespace tablet {
 
+const char* TxnStateToString(TxnState s) {
+  switch (s) {
+    case kNone: return "<none>";
+    case kInitializing: return "INITIALIZING";
+    case kOpen: return "OPEN";
+    case kCommitInProgress: return "COMMIT_IN_PROGRESS";
+    case kCommitted: return "COMMITTED";
+    case kAborted: return "ABORTED";
+  }
+  __builtin_unreachable();
+}
+
 Txn::~Txn() {
   CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&begin_commit_anchor_));
   // As a sanity check, make sure our state makes sense: if we have an MVCC op
@@ -46,7 +58,7 @@ Txn::~Txn() {
   if (commit_op_) {
     DCHECK(state_ == kCommitInProgress ||
            state_ == kCommitted ||
-           state_ == kAborted) << StateToString(state_);
+           state_ == kAborted) << TxnStateToString(state_);
   }
 }
 
@@ -64,7 +76,7 @@ void TxnParticipant::CreateOpenTransaction(int64_t txn_id,
                                            LogAnchorRegistry* log_anchor_registry) {
   std::lock_guard<simple_spinlock> l(lock_);
   EmplaceOrDie(&txns_, txn_id, new Txn(txn_id, log_anchor_registry,
-                                       tablet_metadata_, Txn::kOpen));
+                                       tablet_metadata_, kOpen));
 }
 
 scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id,
@@ -85,7 +97,7 @@ void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
   Txn* txn = FindPointeeOrNull(txns_, txn_id);
   // NOTE: If this is the only reference to the transaction, we can forego
   // locking the state.
-  if (txn && txn->HasOneRef() && txn->state() == Txn::kInitializing) {
+  if (txn && txn->HasOneRef() && txn->state() == kInitializing) {
     txns_.erase(txn_id);
   }
 }
@@ -96,8 +108,8 @@ bool TxnParticipant::ClearIfComplete(int64_t txn_id) {
   // NOTE: If this is the only reference to the transaction, we can forego
   // locking the state.
   if (txn && txn->HasOneRef() &&
-      (txn->state() == Txn::kAborted ||
-       txn->state() == Txn::kCommitted)) {
+      (txn->state() == kAborted ||
+       txn->state() == kCommitted)) {
     txns_.erase(txn_id);
     return true;
   }
@@ -125,13 +137,13 @@ vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const {
     if (txn_meta) {
       txn_metas.erase(txn_entry.txn_id);
       if (txn_meta->aborted()) {
-        txn_entry.state = Txn::kAborted;
+        txn_entry.state = kAborted;
         txn_entry.commit_timestamp = -1;
         continue;
       }
       const auto& commit_ts = txn_meta->commit_timestamp();
       if (commit_ts) {
-        txn_entry.state = Txn::kCommitted;
+        txn_entry.state = kCommitted;
         txn_entry.commit_timestamp = commit_ts->value();
         continue;
       }
@@ -144,14 +156,14 @@ vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const {
     TxnEntry txn_entry;
     txn_entry.txn_id = txn_id;
     if (txn_meta->aborted()) {
-      txn_entry.state = Txn::kAborted;
+      txn_entry.state = kAborted;
       txn_entry.commit_timestamp = -1;
       txns.emplace_back(std::move(txn_entry));
       continue;
     }
     const auto& commit_ts = txn_meta->commit_timestamp();
     if (commit_ts) {
-      txn_entry.state = Txn::kCommitted;
+      txn_entry.state = kCommitted;
       txn_entry.commit_timestamp = commit_ts->value();
       txns.emplace_back(txn_entry);
       continue;
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index 480b6c0..bb0c356 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -26,6 +26,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/macros.h"
@@ -42,6 +43,40 @@
 namespace kudu {
 namespace tablet {
 
+// NOTE: we define the type explicitly so we can forward declare this enum.
+enum TxnState : int8_t {
+
+  // Not a real state; useful for representing optionality.
+  kNone,
+
+  // Each transaction starts in this state. While in this state, the
+  // transaction is not yet ready to be used, e.g. the initial op to begin
+  // the transaction may not have successfully replicated yet.
+  kInitializing,
+
+  // Each transaction is moved into this state once they are ready to begin
+  // accepting ops.
+  kOpen,
+
+  // A transaction is moved into this state when a client has signified the
+  // intent to begin committing it. While in this state, the transaction may
+  // not accept new ops.
+  kCommitInProgress,
+
+  // A transaction is moved into this state when it becomes finalized -- all
+  // participants have acknowledged the intent to commit and have guaranteed
+  // that all ops in the transaction will succeed. While in this state, the
+  // transaction may not accept new ops and may not be aborted.
+  kCommitted,
+
+  // A transaction is moved into this state when a client has signified
+  // intent to cancel the transaction. While in this state, the transaction
+  // may not accept new ops, begin committing, or finalize a commit.
+  kAborted,
+};
+
+const char* TxnStateToString(TxnState s);
+
 // Tracks the state associated with a transaction.
 //
 // This class will primarily be accessed via op drivers. As such, locking
@@ -49,42 +84,6 @@ namespace tablet {
 // replication.
 class Txn : public RefCountedThreadSafe<Txn> {
  public:
-  enum State {
-    // Each transaction starts in this state. While in this state, the
-    // transaction is not yet ready to be used, e.g. the initial op to begin
-    // the transaction may not have successfully replicated yet.
-    kInitializing,
-
-    // Each transaction is moved into this state once they are ready to begin
-    // accepting ops.
-    kOpen,
-
-    // A transaction is moved into this state when a client has signified the
-    // intent to begin committing it. While in this state, the transaction may
-    // not accept new ops.
-    kCommitInProgress,
-
-    // A transaction is moved into this state when it becomes finalized -- all
-    // participants have acknowledged the intent to commit and have guaranteed
-    // that all ops in the transaction will succeed. While in this state, the
-    // transaction may not accept new ops and may not be aborted.
-    kCommitted,
-
-    // A transaction is moved into this state when a client has signified
-    // intent to cancel the transaction. While in this state, the transaction
-    // may not accept new ops, begin committing, or finalize a commit.
-    kAborted,
-  };
-  static const char* StateToString(State s) {
-    switch (s) {
-      case kInitializing: return "INITIALIZING";
-      case kOpen: return "OPEN";
-      case kCommitInProgress: return "COMMIT_IN_PROGRESS";
-      case kCommitted: return "COMMITTED";
-      case kAborted: return "ABORTED";
-    }
-    __builtin_unreachable();
-  }
 
   // Constructs a transaction instance with the given transaction ID and WAL
   // anchor registry.
@@ -94,7 +93,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
   // in-memory state is not GCed, allowing us to rebuild this transaction's
   // in-memory state upon rebooting a server.
   Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry,
-      scoped_refptr<TabletMetadata> tablet_metadata, State init_state = kInitializing)
+      scoped_refptr<TabletMetadata> tablet_metadata, TxnState init_state = kInitializing)
       : txn_id_(txn_id),
         log_anchor_registry_(log_anchor_registry),
         tablet_metadata_(std::move(tablet_metadata)),
@@ -113,6 +112,11 @@ class Txn : public RefCountedThreadSafe<Txn> {
   // replicating a participant op.
   Status ValidateBeginTransaction(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
+    if (PREDICT_FALSE(state_ == kOpen)) {
+      *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
+      return Status::IllegalState(
+          strings::Substitute("Transaction $0 already open", txn_id_));
+    }
     if (PREDICT_FALSE(tablet_metadata_->HasTxnMetadata(txn_id_))) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
@@ -123,41 +127,58 @@ class Txn : public RefCountedThreadSafe<Txn> {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot begin transaction in state: $0",
-                              StateToString(state_)));
+                              TxnStateToString(state_)));
     }
     return Status::OK();
   }
-  Status ValidateBeginCommit(tserver::TabletServerErrorPB::Code* code) const {
+  Status ValidateBeginCommit(tserver::TabletServerErrorPB::Code* code,
+                             Timestamp* begin_commit_ts) const {
     DCHECK(state_lock_.is_locked());
     RETURN_NOT_OK(CheckFinishedInitializing(code));
+    if (PREDICT_FALSE(state_ == kCommitInProgress)) {
+      *begin_commit_ts = DCHECK_NOTNULL(commit_op_)->timestamp();
+      *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
+      return Status::IllegalState(
+          strings::Substitute("Transaction $0 commit already in progress", txn_id_));
+    }
     if (PREDICT_FALSE(state_ != kOpen)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot begin committing transaction in state: $0",
-                              StateToString(state_)));
+                              TxnStateToString(state_)));
     }
     return Status::OK();
   }
   Status ValidateFinalize(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code));
+    RETURN_NOT_OK(CheckFinishedInitializing(code, kCommitted));
+    if (PREDICT_FALSE(state_ == kCommitted)) {
+      *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
+      return Status::IllegalState(
+          strings::Substitute("Transaction $0 has already been committed", txn_id_));
+    }
     if (PREDICT_FALSE(state_ != kCommitInProgress)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot finalize transaction in state: $0",
-                              StateToString(state_)));
+                              TxnStateToString(state_)));
     }
     return Status::OK();
   }
   Status ValidateAbort(tserver::TabletServerErrorPB::Code* code) const {
     DCHECK(state_lock_.is_locked());
-    RETURN_NOT_OK(CheckFinishedInitializing(code));
+    RETURN_NOT_OK(CheckFinishedInitializing(code, kAborted));
+    if (PREDICT_FALSE(state_ == kAborted)) {
+      *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
+      return Status::IllegalState(
+          strings::Substitute("Transaction $0 has already been aborted", txn_id_));
+    }
     if (PREDICT_FALSE(state_ != kOpen &&
                       state_ != kCommitInProgress)) {
       *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
       return Status::IllegalState(
           strings::Substitute("Cannot abort transaction in state: $0",
-                              StateToString(state_)));
+                              TxnStateToString(state_)));
     }
     return Status::OK();
   }
@@ -184,7 +205,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
   }
 
   // Simple accessors for state. No locks are required to call these.
-  State state() const {
+  TxnState state() const {
     return state_;
   }
   int64_t commit_timestamp() const {
@@ -207,15 +228,34 @@ class Txn : public RefCountedThreadSafe<Txn> {
   friend class RefCountedThreadSafe<Txn>;
 
   // Sets the transaction state.
-  void SetState(State s) {
+  void SetState(TxnState s) {
     DCHECK(state_lock_.is_write_locked());
     state_ = s;
   }
 
-  // Returns an error if the transaction has not finished initializing.
-  Status CheckFinishedInitializing(tserver::TabletServerErrorPB::Code* code) const {
+  // Returns an error if the transaction has not finished initializing. This
+  // may mean that an in-flight transaction didn't exist so we created a new
+  // one, in which case we need to check if there's existing metadata for it.
+  //
+  // NOTE: we only need to check the expected metadata state when we're
+  // aborting or finalizing a commit, since these end the in-flight
+  // transaction. In other cases, we should be able to check the state of the
+  // in-flight transaction.
+  Status CheckFinishedInitializing(
+      tserver::TabletServerErrorPB::Code* code,
+      TxnState expected_metadata_state = kNone) const {
+    DCHECK(expected_metadata_state == kNone ||
+           expected_metadata_state == kAborted ||
+           expected_metadata_state == kCommitted);
     if (PREDICT_FALSE(state_ == kInitializing)) {
-      if (tablet_metadata_->HasTxnMetadata(txn_id_)) {
+      TxnState meta_state;
+      if (tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state)) {
+        if (expected_metadata_state != kNone && expected_metadata_state == meta_state) {
+          *code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
+          return Status::IllegalState(
+              strings::Substitute("Transaction metadata for transaction $0 already set",
+                                  txn_id_));
+        }
         // We created this transaction as a part of this op (i.e. it was not
         // already in flight), and there is existing metadata for it.
         *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
@@ -247,7 +287,7 @@ class Txn : public RefCountedThreadSafe<Txn> {
   // reading 'state_' and relying on it remaining constant must take this lock
   // in read mode.
   mutable rw_semaphore state_lock_;
-  std::atomic<State> state_;
+  std::atomic<TxnState> state_;
 
   // If this transaction was successfully committed, the timestamp at which the
   // transaction should be applied, and -1 otherwise.
@@ -273,7 +313,7 @@ class TxnParticipant {
   // for testing, as it easy to construct.
   struct TxnEntry {
     int64_t txn_id;
-    Txn::State state;
+    TxnState state;
     int64_t commit_timestamp;
   };
 
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index e51d190..4bd6c6b 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -106,6 +106,9 @@ message TabletServerErrorPB {
 
     // The requested transaction is not in the appropriate state.
     TXN_ILLEGAL_STATE = 22;
+
+    // The requested transaction participant op was already applied.
+    TXN_OP_ALREADY_APPLIED = 23;
   }
 
   // The error code.