You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/03/23 17:50:48 UTC
[kudu] branch master updated: KUDU-2612: allow aborting after
beginning to commit
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 058ec9a KUDU-2612: allow aborting after beginning to commit
058ec9a is described below
commit 058ec9a2baa9edb75904e8aa83716dd2734f3bbd
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 2 17:07:39 2021 -0800
KUDU-2612: allow aborting after beginning to commit
This patch adjusts the TxnStatusManager state machine to include a new
FINALIZE_IN_PROGRESS state to serve as an intermediate step between
COMMIT_IN_PROGRESS and COMMITTED.
The goal is to allow for aborts to occur in the event that we've begun
committing, but not yet called FINALIZE_COMMIT on any of the
transaction's participants, which may be desirable in cases where
anything surprising has happened on the participants (e.g. if they're
deleted).
Change-Id: If1b6596df2db5601f7e17e528ad6dc68057b67f8
Reviewed-on: http://gerrit.cloudera.org:8080/17022
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
.../org/apache/kudu/client/KuduTransaction.java | 1 +
src/kudu/client/transaction-internal.cc | 1 +
src/kudu/integration-tests/txn_commit-itest.cc | 219 ++++++++++++---
src/kudu/master/txn_manager-test.cc | 2 +-
src/kudu/tablet/txn_participant-test.cc | 4 +-
src/kudu/transactions/transactions.proto | 33 +++
src/kudu/transactions/txn_status_entry.cc | 6 +
src/kudu/transactions/txn_status_entry.h | 6 +
src/kudu/transactions/txn_status_manager-test.cc | 160 ++++++-----
src/kudu/transactions/txn_status_manager.cc | 305 +++++++++++++++++----
src/kudu/transactions/txn_status_manager.h | 110 +++++++-
11 files changed, 681 insertions(+), 166 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 19286f2..05a5616 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -317,6 +317,7 @@ public class KuduTransaction implements AutoCloseable {
throw new NonRecoverableException(Status.IllegalState("transaction is still open"));
case COMMITTED:
return true;
+ case FINALIZE_IN_PROGRESS:
case COMMIT_IN_PROGRESS:
return false;
default:
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index ada717a..1436e24 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -314,6 +314,7 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
*is_complete = true;
*completion_status = Status::Aborted("transaction has been aborted");
break;
+ case TxnStatePB::FINALIZE_IN_PROGRESS:
case TxnStatePB::COMMIT_IN_PROGRESS:
*is_complete = false;
*completion_status = Status::Incomplete("commit is still in progress");
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index 4f2ea2b..f125f54 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -17,9 +17,11 @@
#include <atomic>
#include <cstdint>
+#include <cstdlib>
#include <functional>
#include <map>
#include <memory>
+#include <ostream>
#include <string>
#include <thread>
#include <unordered_map>
@@ -40,8 +42,10 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
@@ -94,6 +98,7 @@ using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace itest {
@@ -190,18 +195,25 @@ class TxnCommitITest : public KuduTest {
return Status::OK();
}
- // Insert 'num_rows' rows to the given session, starting with 'start_row'.
+ // Insert 'num_rows' rows to the given session, starting with 'start_row', to
+ // every table in 'table_names' or 'table_name_' if not set.
Status InsertToSession(
- const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows) {
- shared_ptr<KuduTable> table;
- RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
- const int target_row_id = start_row + num_rows;
- for (int i = start_row; i < target_row_id; i++) {
- auto* insert = table->NewInsert();
- RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i));
- RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i));
- RETURN_NOT_OK(txn_session->Apply(insert));
- RETURN_NOT_OK(txn_session->Flush());
+ const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows,
+ vector<string> table_names = {}) {
+ if (table_names.empty()) {
+ table_names = { table_name_ };
+ }
+ for (const auto& table_name : table_names) {
+ shared_ptr<KuduTable> table;
+ RETURN_NOT_OK(client_->OpenTable(table_name, &table));
+ const int target_row_id = start_row + num_rows;
+ for (int i = start_row; i < target_row_id; i++) {
+ auto* insert = table->NewInsertIgnore();
+ RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i));
+ RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i));
+ RETURN_NOT_OK(txn_session->Apply(insert));
+ RETURN_NOT_OK(txn_session->Flush());
+ }
}
return Status::OK();
}
@@ -418,13 +430,13 @@ TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
ASSERT_OK(client_->DeleteTable(table_name_));
ASSERT_OK(txn->Commit(/*wait*/false));
- // The transaction should eventually succeed, treating the deleted
- // participant as committed.
+ // The transaction should eventually fail, treating the deleted participant
+ // as a fatal error.
ASSERT_EVENTUALLY([&] {
Status completion_status;
bool is_complete = false;
ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
- ASSERT_OK(completion_status);
+ ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
ASSERT_TRUE(is_complete);
});
}
@@ -442,22 +454,23 @@ TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) {
ASSERT_OK(txn->Commit(/*wait*/false));
- // The transaction should eventually succeed, treating the deleted
- // participant as committed.
+ // The transaction should eventually abort, treating the deleted participant
+ // as fatal, resulting in an aborted transaction.
ASSERT_EVENTUALLY([&] {
Status completion_status;
bool is_complete = false;
ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
- ASSERT_OK(completion_status);
+ ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
ASSERT_TRUE(is_complete);
});
}
TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
+ FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000;
shared_ptr<KuduTransaction> txn;
shared_ptr<KuduSession> txn_session;
ASSERT_OK(BeginTransaction(&txn, &txn_session));
- FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+ ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
ASSERT_OK(txn->Commit(/*wait*/false));
// Stop the tserver without allowing the finalize commit to complete.
cluster_->mini_tablet_server(0)->Shutdown();
@@ -487,9 +500,72 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
});
}
+// Test aborting a botched commit mid-way by deleting some of its participants
+// while committing. The result should be that the transaction gets aborted and
+// all participants abort the local transactions.
+TEST_F(TxnCommitITest, TestAbortRacingWithBotchedCommit) {
+ // First, create another table that we'll delete later on.
+ const string kSecondTableName = "default.second_table";
+ TestWorkload w(cluster_.get(), TestWorkload::PartitioningType::HASH);
+ w.set_num_replicas(1);
+ w.set_num_tablets(2);
+ w.set_table_name(kSecondTableName);
+ w.Setup();
+ w.Start();
+ while (w.rows_inserted() < 1) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ w.StopAndJoin();
+ unordered_set<string> participant_ids;
+ auto* mts = cluster_->mini_tablet_server(0);
+ for (const auto& tablet_id : mts->ListTablets()) {
+ if (tablet_id != tsm_id_) {
+ participant_ids.emplace(tablet_id);
+ }
+ }
+ ASSERT_EQ(4, participant_ids.size());
+ vector<string> both_tables_participant_ids(participant_ids.begin(), participant_ids.end());
+ shared_ptr<KuduTransaction> txn;
+ shared_ptr<KuduSession> txn_session;
+ ASSERT_OK(BeginTransaction(&txn, &txn_session));
+ ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn,
+ { table_name_, kSecondTableName }));
+ ASSERT_OK(client_->DeleteTable(kSecondTableName));
+ FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+ ASSERT_OK(txn->Commit(/*wait*/false));
+ ASSERT_OK(txn->Rollback());
+ ASSERT_EVENTUALLY([&] {
+ Status completion_status;
+ bool is_complete = false;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+ ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+ ASSERT_TRUE(is_complete);
+ });
+
+ // Let's confirm that all remaining participants see the same transaction
+ // metadata.
+ vector<scoped_refptr<TabletReplica>> replicas;
+ mts->server()->tablet_manager()->GetTabletReplicas(&replicas);
+ vector<vector<TxnParticipant::TxnEntry>> txn_entries_per_replica;
+ for (const auto& r : replicas) {
+ if (r->tablet_id() != tsm_id_ && r->tablet_metadata()->table_name() != kSecondTableName) {
+ txn_entries_per_replica.emplace_back(r->tablet()->txn_participant()->GetTxnsForTests());
+ }
+ }
+ ASSERT_GT(txn_entries_per_replica.size(), 1);
+ for (int i = 1; i < txn_entries_per_replica.size(); i++) {
+ const auto& txns = txn_entries_per_replica[i];
+ ASSERT_FALSE(txns.empty());
+ for (const auto& txn_entry : txns) {
+ ASSERT_EQ(tablet::kAborted, txn_entry.state);
+ }
+ EXPECT_EQ(txn_entries_per_replica[0], txns);
+ }
+}
+
// Test restarting while commit tasks are on-going, while at the same time,
-// some participants are deleted. There should be no inconsistencies in
-// assigned commit timestamps across participants.
+// some participants are deleted. The transaction should be aborted on all
+// participants.
TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
// First, create another table that we'll delete later on.
const string kSecondTableName = "default.second_table";
@@ -507,27 +583,25 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
shared_ptr<KuduTransaction> txn;
shared_ptr<KuduSession> txn_session;
ASSERT_OK(BeginTransaction(&txn, &txn_session));
- ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
- FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+ ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn,
+ { table_name_, kSecondTableName }));
+ ASSERT_OK(client_->DeleteTable(kSecondTableName));
+ FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000;
ASSERT_OK(txn->Commit(/*wait*/false));
- // Wait a bit to let the commit tasks start.
- SleepFor(MonoDelta::FromMilliseconds(1000));
-
// Shut down without giving time for the commit to complete.
auto* mts = cluster_->mini_tablet_server(0);
mts->Shutdown();
ASSERT_OK(mts->Restart());
- ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
- // Delete some of the participants. Despite this, the commit process should
- // complete.
- ASSERT_OK(client_->DeleteTable(kSecondTableName));
+ // Delete some of the participants. Upon completion, the commit process
+ // should result in an abort.
+ ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
ASSERT_EVENTUALLY([&] {
Status completion_status;
bool is_complete = false;
ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
- ASSERT_OK(completion_status);
+ ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
ASSERT_TRUE(is_complete);
});
@@ -546,7 +620,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
const auto& txns = txn_entries_per_replica[i];
ASSERT_FALSE(txns.empty());
for (const auto& txn_entry : txns) {
- ASSERT_NE(-1, txn_entry.commit_timestamp);
+ ASSERT_EQ(tablet::kAborted, txn_entry.state);
}
EXPECT_EQ(txn_entries_per_replica[0], txns);
}
@@ -610,7 +684,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
// Try concurrently beginning to commit a bunch of different transactions.
TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
- const int kNumTxns = 4;
+ constexpr const int kNumTxns = 4;
vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
int row_start = initial_row_count_;
for (int i = 0; i < kNumTxns; i++) {
@@ -649,6 +723,65 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn * kNumTxns, num_rows);
}
+TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) {
+ constexpr const int kNumTxns = 10;
+ vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
+ int row_start = initial_row_count_;
+ for (int i = 0; i < kNumTxns; i++) {
+ shared_ptr<KuduSession> txn_session;
+ ASSERT_OK(BeginTransaction(&txns[i], &txn_session));
+ ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn));
+ row_start += kNumRowsPerTxn;
+ }
+ int num_rows = 0;
+ ASSERT_OK(CountRows(&num_rows));
+ ASSERT_EQ(initial_row_count_, num_rows);
+ // To encourage races between concurrent aborts and commits, inject a random
+ // sleep before each call.
+ constexpr const int kMaxSleepMs = 1000;
+ std::atomic<int> num_committed_txns = 0;
+ vector<thread> threads;
+ for (int i = 0; i < kNumTxns; i++) {
+ threads.emplace_back([&, i] {
+ SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs));
+ Status s = txns[i]->Commit(/*wait*/true);
+ if (s.ok()) {
+ num_committed_txns++;
+ }
+ });
+ threads.emplace_back([&, i] {
+ SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs));
+ ignore_result(txns[i]->Rollback());
+ });
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+ ASSERT_OK(CountRows(&num_rows));
+ // NOTE: we can compare an exact count here and not worry about whether we
+ // completed aborting rows because even if we didn't complete the abort, they
+ // shouldn't be visible to clients anyway.
+ const int expected_rows = initial_row_count_ + kNumRowsPerTxn * num_committed_txns;
+ VLOG(1) << Substitute("Expecting $0 rows from $1 committed transactions",
+ expected_rows, num_committed_txns.load());
+ ASSERT_EQ(expected_rows, num_rows);
+
+ // Ensure all transactions are either committed or aborted.
+ vector<scoped_refptr<TabletReplica>> replicas;
+ cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas);
+ for (const auto& r : replicas) {
+ ASSERT_EVENTUALLY([&] {
+ if (r->tablet_metadata()->table_name() == table_name_) {
+ const auto txns = r->tablet()->txn_participant()->GetTxnsForTests();
+ for (const auto& txn : txns) {
+ ASSERT_TRUE(txn.state == tablet::kAborted || txn.state == tablet::kCommitted)
+ << Substitute("Txn in unexpected state: $0", txn.state);;
+ }
+ }
+ });
+ }
+}
+
// Test that committing the same transaction concurrently doesn't lead to any
// issues.
TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
@@ -660,7 +793,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
ASSERT_OK(CountRows(&num_rows));
ASSERT_EQ(initial_row_count_, num_rows);
- const int kNumThreads = 4;
+ constexpr const int kNumThreads = 4;
vector<thread> threads;
vector<Status> results(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
@@ -685,7 +818,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows);
}
-TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
+TEST_F(TxnCommitITest, TestDontBackgroundAbortIfCommitInProgress) {
FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000;
string serialized_txn;
{
@@ -714,6 +847,24 @@ TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
});
}
+// Test that we can abort if a transaction hasn't finalized its commit yet.
+TEST_F(TxnCommitITest, TestAbortIfCommitInProgress) {
+ FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000;
+ shared_ptr<KuduTransaction> txn;
+ shared_ptr<KuduSession> txn_session;
+ ASSERT_OK(BeginTransaction(&txn, &txn_session));
+ ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+ ASSERT_OK(txn->Commit(/*wait*/false));
+ ASSERT_OK(txn->Rollback());
+ ASSERT_EVENTUALLY([&] {
+ Status completion_status;
+ bool is_complete;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+ ASSERT_TRUE(completion_status.IsAborted());
+ ASSERT_TRUE(is_complete);
+ });
+}
+
// Test that has two nodes so we can place the TxnStatusManager and transaction
// participant on separate nodes. This can be useful for testing when some
// nodes are down.
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index d58f1f9..1409697 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -392,7 +392,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(
s.ToString(),
- Substitute("transaction ID $0 is already in terminal state", txn_id));
+ Substitute("transaction ID $0 is not available for further commits", txn_id));
// The transaction should stay in ABORTED state, of course.
TxnStatePB txn_state;
NO_FATALS(fetch_txn_status(txn_id, &txn_state));
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index ca8af95..c1e5437 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -245,9 +245,9 @@ TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
}), txn_participant()->GetTxnsForTests());
}
-TEST_F(TxnParticipantTest, TestTransactionNotFound) {
+TEST_F(TxnParticipantTest, TestParticipantOpsWhenNotBegun) {
const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
- int64_t txn_id) {
+ int64_t txn_id) {
for (const auto& type : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
diff --git a/src/kudu/transactions/transactions.proto b/src/kudu/transactions/transactions.proto
index 4d7b8f9..a58d8bd 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -19,12 +19,45 @@ package kudu.transactions;
option java_package = "org.apache.kudu.transactions";
+// The following state changes are expected:
+//
+// BeginCommit FinalizeCommit CompleteCommit
+// OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
+//
+// BeginCommit BeginAbort FinalizeAbort
+// OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
+//
+// AbortTxn FinalizeAbort
+// OPEN --> ABORT_IN_PROGRESS --> ABORTED
enum TxnStatePB {
UNKNOWN = 0;
+ // The transaction is open. Users can write to participants, and register new
+ // participants.
OPEN = 1;
+
+ // A user or Kudu's transaction staleness tracker has signaled that the
+ // transaction should be aborted. No further participants can be registered,
+ // and the transaction can only be moved to the ABORTED state after sending
+ // ABORT_TXN ops to all participants.
ABORT_IN_PROGRESS = 5;
+
+ // The transaction has been fully aborted -- all participants have
+ // successfully replicated ABORT_TXN ops and cleared the transaction. No
+ // further tasks are required to abort the transaction.
ABORTED = 2;
+
+ // The user has signaled that the transaction should be committed. No further
+ // participants can be registered. The transaction may still be aborted if
+ // prompted by a user or if sending any BEGIN_COMMIT op fails.
COMMIT_IN_PROGRESS = 3;
+
+ // Kudu has successfully sent BEGIN_COMMIT ops to all participants, and has
+ // started sending FINALIZE_COMMIT ops to participants. The transaction can
+ // only be moved to the COMMITTED state.
+ FINALIZE_IN_PROGRESS = 6;
+
+ // All FINALIZE_COMMIT ops have succeeded. No further tasks are required to
+ // commit the transaction.
COMMITTED = 4;
}
diff --git a/src/kudu/transactions/txn_status_entry.cc b/src/kudu/transactions/txn_status_entry.cc
index 9d77d81..3560fa6 100644
--- a/src/kudu/transactions/txn_status_entry.cc
+++ b/src/kudu/transactions/txn_status_entry.cc
@@ -65,5 +65,11 @@ TxnStatePB TransactionEntry::state() const {
return l.data().pb.state();
}
+int64_t TransactionEntry::commit_timestamp() const {
+ CowLock<PersistentTransactionEntry> l(&metadata_, LockMode::READ);
+ DCHECK(l.data().pb.has_commit_timestamp());
+ return l.data().pb.commit_timestamp();
+}
+
} // namespace transactions
} // namespace kudu
diff --git a/src/kudu/transactions/txn_status_entry.h b/src/kudu/transactions/txn_status_entry.h
index fe25409..b51ac1a 100644
--- a/src/kudu/transactions/txn_status_entry.h
+++ b/src/kudu/transactions/txn_status_entry.h
@@ -103,6 +103,12 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
// via the locking provided by the underlying copy-on-write metadata_ object.
TxnStatePB state() const;
+ // An accessor to the transaction's commit timestamp. Concurrent access is
+ // controlled via the locking provided by the underlying copy-on-write
+ // metadata_ object. Should only be used if the transaction is expected to
+ // have a commit timestamp already set.
+ int64_t commit_timestamp() const;
+
private:
friend class RefCountedThreadSafe<TransactionEntry>;
~TransactionEntry() = default;
diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc
index 61f38c6..aaa7e4a 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -461,31 +461,47 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
ASSERT_FALSE(txn_status.has_state());
ASSERT_FALSE(txn_status.has_user());
}
+ constexpr const int64_t kNoTxnId = 0;
+ constexpr const int64_t kCommittedTxnId = 1;
+ constexpr const int64_t kAbortInProgressTxnId = 2;
+ constexpr const int64_t kCommitInProgressTxnId = 3;
+ constexpr const int64_t kOpenTxnId = 4;
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
- ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->BeginTransaction(kCommittedTxnId, kOwner, nullptr, &ts_error));
TxnStatusEntryPB txn_status;
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 1, kOwner, &txn_status, &ts_error));
+ kCommittedTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(kCommittedTxnId, kOwner, &ts_error));
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 1, kOwner, &txn_status, &ts_error));
+ kCommittedTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, Timestamp::kInitialTimestamp, &ts_error));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kCommittedTxnId,
+ Timestamp::kInitialTimestamp, &ts_error));
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 1, kOwner, &txn_status, &ts_error));
+ kCommittedTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
+ ASSERT_TRUE(txn_status.has_commit_timestamp());
+ ASSERT_EQ(TxnStatePB::FINALIZE_IN_PROGRESS, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->CompleteCommitTransaction(kCommittedTxnId));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ kCommittedTxnId, kOwner, &txn_status, &ts_error));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_TRUE(txn_status.has_commit_timestamp());
ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
@@ -494,12 +510,12 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
- ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
- ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->BeginTransaction(kAbortInProgressTxnId, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->AbortTransaction(kAbortInProgressTxnId, kOwner, &ts_error));
TxnStatusEntryPB txn_status;
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 2, kOwner, &txn_status, &ts_error));
+ kAbortInProgressTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
@@ -510,11 +526,11 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
// Start another transaction and start its commit phase.
TabletServerErrorPB ts_error;
- ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
- ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->BeginTransaction(kCommitInProgressTxnId, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(kCommitInProgressTxnId, kOwner, &ts_error));
// Start just another transaction.
- ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->BeginTransaction(kOpenTxnId, kOwner, nullptr, &ts_error));
}
// Make the TxnStatusManager start from scratch.
@@ -529,28 +545,28 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
TxnStatusEntryPB txn_status;
TabletServerErrorPB ts_error;
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 1, kOwner, &txn_status, &ts_error));
+ kCommittedTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 2, kOwner, &txn_status, &ts_error));
+ kAbortInProgressTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 3, kOwner, &txn_status, &ts_error));
+ kCommitInProgressTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
ASSERT_OK(txn_manager_->GetTransactionStatus(
- 4, kOwner, &txn_status, &ts_error));
+ kOpenTxnId, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
@@ -563,7 +579,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
TxnStatusEntryPB txn_status;
TabletServerErrorPB ts_error;
auto s = txn_manager_->GetTransactionStatus(
- 1, "stranger", &txn_status, &ts_error);
+ kCommittedTxnId, "stranger", &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
}
@@ -573,7 +589,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
TxnStatusEntryPB txn_status;
TabletServerErrorPB ts_error;
auto s = txn_manager_->GetTransactionStatus(
- 0, kOwner, &txn_status, &ts_error);
+ kNoTxnId, kOwner, &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
@@ -583,7 +599,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
TxnStatusEntryPB txn_status;
TabletServerErrorPB ts_error;
auto s = txn_manager_->GetTransactionStatus(
- 0, "stranger", &txn_status, &ts_error);
+ kNoTxnId, "stranger", &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
}
@@ -601,19 +617,22 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
}
- // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
+ const auto not_authorized_keep_alive = [&] (const int64_t txn_id) {
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->KeepTransactionAlive(txn_id, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ Substitute("transaction ID $0 not owned by stranger", txn_id));
+ };
+
+ // OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
// Supplying wrong user for transaction in OPEN state.
- {
- auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 not owned by stranger");
- }
+ NO_FATALS(not_authorized_keep_alive(1));
ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
@@ -621,28 +640,26 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
ASSERT_STR_CONTAINS(s.ToString(),
"transaction ID 1 is in commit phase");
// Supplying wrong user for transaction in COMMIT_IN_PROGRESS state.
- {
- auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 not owned by stranger");
- }
+ NO_FATALS(not_authorized_keep_alive(1));
ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, Timestamp::kInitialTimestamp, &ts_error));
s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 is already in terminal state");
+ "transaction ID 1 is in commit phase");
+ // Supplying wrong user for transaction in FINALIZE_IN_PROGRESS state.
+ NO_FATALS(not_authorized_keep_alive(1));
+
+ ASSERT_OK(txn_manager_->CompleteCommitTransaction(1));
+ s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 is not available for further commits");
// Supplying wrong user for transaction in COMMITTED state.
- {
- auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 not owned by stranger");
- }
+ NO_FATALS(not_authorized_keep_alive(1));
}
- // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
+ // OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
@@ -659,16 +676,12 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 2 is already in terminal state");
+ "transaction ID 2 is not available for further commits");
// Supplying wrong user for transaction in ABORTED state.
- {
- auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
- }
+ NO_FATALS(not_authorized_keep_alive(2));
}
- // OPEN --> ABORTED
+ // OPEN --> ABORT_IN_PROGRESS
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
@@ -679,7 +692,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
auto s = txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 3 is already in terminal state");
+ "transaction ID 3 is not available for further commits");
}
// Open a new transaction just before restarting the TxnStatusManager.
@@ -703,14 +716,9 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 is already in terminal state");
- // Supplying wrong user for transaction in COMMITTED state.
- {
- auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 1 not owned by stranger");
- }
+ "transaction ID 1 is not available for further commits");
+ // Supplying wrong user for transaction in FINALIZE_IN_PROGRESS state.
+ NO_FATALS(not_authorized_keep_alive(1));
}
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
@@ -718,25 +726,16 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 2 is already in terminal state");
- // Supplying wrong user for transaction in ABORTED state.
- {
- auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
- }
+ "transaction ID 2 is not available for further commits");
+ // Supplying wrong user for transaction in ABORT_IN_PROGRESS state.
+ NO_FATALS(not_authorized_keep_alive(2));
}
{
TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
TabletServerErrorPB ts_error;
ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
// Supplying wrong user for transaction in OPEN state.
- {
- auto s = txn_manager_->KeepTransactionAlive(4, "stranger", &ts_error);
- ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(),
- "transaction ID 4 not owned by stranger");
- }
+ NO_FATALS(not_authorized_keep_alive(4));
}
}
@@ -792,12 +791,16 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
s = txn_manager_->FinalizeCommitTransaction(kTxnId1, Timestamp::kInitialTimestamp, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ s = txn_manager_->CompleteCommitTransaction(kTxnId1);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// We can't finalize a commit that hasn't begun committing.
const int64_t kTxnId2 = 2;
ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
s = txn_manager_->FinalizeCommitTransaction(kTxnId2, Timestamp::kInitialTimestamp, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ s = txn_manager_->CompleteCommitTransaction(kTxnId2);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// We can't abort a transaction that has finished committing.
ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
@@ -810,6 +813,18 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
+
+ // We can't abort a transaction that has been completely committed.
+ ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId2));
+ s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+ // Redundant begin commit, finalize calls, and complete calls are also
+ // benign.
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
+ kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
+ ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId2));
}
// Test that we can only add participants to a transaction when it's in an
@@ -834,12 +849,17 @@ TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) {
s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
- // We can't register participants when we've finished committnig.
+ // We can't register participants when we've finalized the commit.
ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
kTxnId1, Timestamp::kInitialTimestamp, &ts_error));
s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ // We can't register participants when we've completely committed.
+ ASSERT_OK(txn_manager_->CompleteCommitTransaction(kTxnId1));
+ s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
// We can't register participants when we've aborted the transaction.
const int64_t kTxnId2 = 2;
ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, &ts_error));
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 8981c5a..ce8e24d 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -194,6 +194,24 @@ bool CommitTasks::IsShuttingDownCleanup() const {
return false;
}
+bool CommitTasks::ScheduleAbortIfNecessary() {
+ switch (abort_txn_) {
+ case ABORT_IN_PROGRESS:
+ LOG(INFO) << Substitute("Scheduling write for ABORT_IN_PROGRESS for txn $0",
+ txn_id_.ToString());
+ ScheduleBeginAbortTxnWrite();
+ return true;
+ case ABORTED:
+ LOG(INFO) << Substitute("Scheduling ABORT_TXNs on participants for txn $0",
+ txn_id_.ToString());
+ AbortTxnAsync();
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
DCHECK_LT(participant_idx, participant_ids_.size());
// Status callback called with the result from the participant op. This is
@@ -218,12 +236,16 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
// been committed, rather than attempting to abort or something. This is
// important to ensure retries of the commit tasks reliably result in the
// same operations being performed.
- LOG(INFO) << Substitute("Participant $0 was not found: $1",
+ LOG(INFO) << Substitute("Participant $0 was not found for BEGIN_COMMIT, aborting: $1",
participant_ids_[participant_idx], s.ToString());
+ SetNeedsBeginAbort();
} else if (PREDICT_FALSE(!s.ok())) {
// For any other kind of error, just exit without completing.
// TODO(awong): we're presuming that such errors wouldn't benefit from
// just retrying.
+ // TODO(awong): we don't expect them, but if we ever somehow find
+ // ourselves with an aborted transaction on the participant, we should
+ // probably abort here.
LOG(WARNING) << Substitute("Participant $0 BEGIN_COMMIT op returned $1",
participant_ids_[participant_idx], s.ToString());
stop_task_ = true;
@@ -235,12 +257,16 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
if (IsShuttingDownCleanup()) {
return;
}
+ if (ScheduleAbortIfNecessary()) {
+ return;
+ }
Timestamp max_timestamp(Timestamp::kInitialTimestamp);
for (const auto& ts : begin_commit_timestamps_) {
max_timestamp = std::max(ts, max_timestamp);
}
DCHECK_NE(Timestamp::kInitialTimestamp, max_timestamp);
- FinalizeCommitAsync(max_timestamp);
+ set_commit_timestamp(max_timestamp);
+ ScheduleFinalizeCommitWrite();
}
};
ParticipantOpPB op_pb;
@@ -254,27 +280,26 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
&begin_commit_timestamps_[participant_idx]);
}
-void CommitTasks::FinalizeCommitAsyncTask(int participant_idx, const Timestamp& commit_timestamp) {
+void CommitTasks::FinalizeCommitAsyncTask(int participant_idx) {
+ DCHECK_EQ(TxnStatePB::UNKNOWN, abort_txn_);
DCHECK_LT(participant_idx, participant_ids_.size());
// Status callback called with the result from the participant op.
scoped_refptr<CommitTasks> scoped_this(this);
auto participated_cb = [this, scoped_this = std::move(scoped_this),
- participant_idx, commit_timestamp] (const Status& s) {
+ participant_idx] (const Status& s) {
if (IsShuttingDownCleanupIfLastOp()) {
return;
}
if (PREDICT_FALSE(s.IsTimedOut())) {
LOG(WARNING) << Substitute("Retrying FINALIZE_COMMIT op for txn $0: $1",
txn_id_.ToString(), s.ToString());
- FinalizeCommitAsyncTask(participant_idx, commit_timestamp);
+ FinalizeCommitAsyncTask(participant_idx);
return;
- } else if (PREDICT_FALSE(!s.ok())) {
+ }
+ if (PREDICT_FALSE(!s.ok())) {
// Presumably the error is not transient (e.g. not found) so retrying
// won't help. But we've already begun sending out FINALIZE_TXN ops, so
// we must complete the transaction.
- // TODO(awong): revisit this; if we include an intermediate state between
- // kCommitInProgress and kCommitted, that might be an opportune moment to
- // abort.
LOG(WARNING) << Substitute("Participant $0 FINALIZE_COMMIT op returned $1",
participant_ids_[participant_idx], s.ToString());
}
@@ -284,13 +309,13 @@ void CommitTasks::FinalizeCommitAsyncTask(int participant_idx, const Timestamp&
if (IsShuttingDownCleanup()) {
return;
}
- ScheduleFinalizeCommitWrite(commit_timestamp);
+ ScheduleCompleteCommitWrite();
}
};
ParticipantOpPB op_pb;
op_pb.set_txn_id(txn_id_.value());
op_pb.set_type(ParticipantOpPB::FINALIZE_COMMIT);
- op_pb.set_finalized_commit_timestamp(commit_timestamp.value());
+ op_pb.set_finalized_commit_timestamp(commit_timestamp_.value());
txn_client_->ParticipateInTransactionAsync(
participant_ids_[participant_idx],
std::move(op_pb),
@@ -313,7 +338,8 @@ void CommitTasks::AbortTxnAsyncTask(int participant_idx) {
if (PREDICT_FALSE(s.IsNotFound())) {
// If the participant has been deleted, treat it as though it's already
// been aborted. The participant's data can't be read anyway.
- LOG(INFO) << Substitute("Participant $0 was not found: $1",
+ LOG(INFO) << Substitute("Participant $0 was not found for ABORT_TXN, proceeding "
+ "as if op succeeded: $1",
participant_ids_[participant_idx], s.ToString());
} else if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << Substitute("Participant $0 ABORT_TXN op returned $1",
@@ -326,7 +352,7 @@ void CommitTasks::AbortTxnAsyncTask(int participant_idx) {
if (IsShuttingDownCleanup()) {
return;
}
- ScheduleAbortTxnWrite();
+ ScheduleFinalizeAbortTxnWrite();
}
};
ParticipantOpPB op_pb;
@@ -343,7 +369,7 @@ void CommitTasks::AbortTxnAsync() {
// Reset the in-flight counter to indicate we're waiting for this new set of
// tasks to complete.
if (participant_ids_.empty()) {
- ScheduleAbortTxnWrite();
+ ScheduleFinalizeAbortTxnWrite();
} else {
ops_in_flight_ = participant_ids_.size();
for (int i = 0; i < participant_ids_.size(); i++) {
@@ -352,7 +378,41 @@ void CommitTasks::AbortTxnAsync() {
}
}
-void CommitTasks::ScheduleAbortTxnWrite() {
+void CommitTasks::ScheduleBeginAbortTxnWrite() {
+ DCHECK_NE(TxnStatePB::UNKNOWN, abort_txn_);
+ DCHECK_EQ(0, ops_in_flight_);
+ // Submit the task to a threadpool.
+ // NOTE: This is called by the reactor thread that catches the BeginCommit
+ // response, so we can't do IO in this thread.
+ scoped_refptr<CommitTasks> scoped_this(this);
+ CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+ tsm = txn_status_manager_,
+ txn_id = txn_id_] {
+ if (stop_task_ || tsm->shutting_down()) {
+ tsm->RemoveCommitTask(txn_id, this);
+ return;
+ }
+ TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+ if (PREDICT_TRUE(l.first_failed_status().ok())) {
+ if (abort_txn_ == ABORT_IN_PROGRESS) {
+ TabletServerErrorPB ts_error;
+ // Clear out these commit tasks so we can start new ones focused on
+ // aborting.
+ tsm->RemoveCommitTask(txn_id, this);
+ WARN_NOT_OK(tsm->BeginAbortTransaction(txn_id.value(), boost::none, &ts_error),
+ "Error writing to transaction status table");
+ } else {
+ // It's possible that while we were waiting to be scheduled, a client
+ // already called BeginAbortTransaction(). If so, we just need to go
+ // about aborting the transaction.
+ DCHECK_EQ(ABORTED, abort_txn_);
+ AbortTxnAsync();
+ }
+ }
+ }));
+}
+
+void CommitTasks::ScheduleFinalizeAbortTxnWrite() {
// Submit the task to a threadpool.
// NOTE: This is called by the reactor thread that catches the BeginCommit
// response, so we can't do IO in this thread.
@@ -378,36 +438,39 @@ void CommitTasks::ScheduleAbortTxnWrite() {
}));
}
-void CommitTasks::FinalizeCommitAsync(Timestamp commit_timestamp) {
+void CommitTasks::FinalizeCommitAsync() {
+ DCHECK_NE(Timestamp::kInitialTimestamp, commit_timestamp_);
// Reset the in-flight counter to indicate we're waiting for this new set of
// tasks to complete.
auto old_val = ops_in_flight_.exchange(participant_ids_.size());
DCHECK_EQ(0, old_val);
ops_in_flight_ = participant_ids_.size();
for (int i = 0; i < participant_ids_.size(); i++) {
- FinalizeCommitAsyncTask(i, commit_timestamp);
+ FinalizeCommitAsyncTask(i);
}
}
-void CommitTasks::ScheduleFinalizeCommitWrite(Timestamp commit_timestamp) {
+void CommitTasks::ScheduleCompleteCommitWrite() {
// Submit the task to a threadpool.
- // NOTE: This is called by the reactor thread that catches the BeginCommit
+ // NOTE: This is called by the reactor thread that catches the FinalizeCommit
// response, so we can't do IO in this thread.
+ DCHECK_EQ(TxnStatePB::UNKNOWN, abort_txn_);
DCHECK_EQ(0, ops_in_flight_);
scoped_refptr<CommitTasks> scoped_this(this);
CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
tsm = this->txn_status_manager_,
- txn_id = this->txn_id_, commit_timestamp] {
+ txn_id = this->txn_id_] {
MAYBE_INJECT_RANDOM_LATENCY(
FLAGS_txn_status_manager_inject_latency_finalize_commit_ms);
- if (IsShuttingDownCleanup()) {
+ if (stop_task_ || tsm->shutting_down()) {
+ tsm->RemoveCommitTask(txn_id, this);
return;
}
TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
if (PREDICT_TRUE(l.first_failed_status().ok())) {
TabletServerErrorPB error_pb;
- WARN_NOT_OK(tsm->FinalizeCommitTransaction(txn_id.value(), commit_timestamp, &error_pb),
+ WARN_NOT_OK(tsm->CompleteCommitTransaction(txn_id.value()),
"Error writing to transaction status table");
}
@@ -419,6 +482,53 @@ void CommitTasks::ScheduleFinalizeCommitWrite(Timestamp commit_timestamp) {
}));
}
+void CommitTasks::ScheduleFinalizeCommitWrite() {
+ // Submit the task to a threadpool.
+ // NOTE: This is called by the reactor thread that catches the BeginCommit
+ // response, so we can't do IO in this thread.
+ DCHECK_EQ(0, ops_in_flight_);
+ scoped_refptr<CommitTasks> scoped_this(this);
+ CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+ tsm = this->txn_status_manager_,
+ txn_id = this->txn_id_] {
+ MAYBE_INJECT_RANDOM_LATENCY(
+ FLAGS_txn_status_manager_inject_latency_finalize_commit_ms);
+
+ if (IsShuttingDownCleanup()) {
+ return;
+ }
+ TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+ // TODO(awong): the race handling here specific to concurrent aborts is
+ // messy. Consider a less ad-hoc way to define on-going tasks.
+ // NOTE: the special handling of aborts is only critical in this scheduling
+ // to finalize the commit because the only non-OPEN state an abort can
+ // occur with is COMMIT_IN_PROGRESS, which is the expected state of the
+ // transaction when this is called.
+ if (PREDICT_TRUE(l.first_failed_status().ok())) {
+ // It's possible that a user called BeginAbortTransaction while we were
+ // waiting, so before attempting to commit, with the leader lock held,
+ // check if that's the case.
+ if (ScheduleAbortIfNecessary()) {
+ return;
+ }
+ TabletServerErrorPB error_pb;
+ Status s = tsm->FinalizeCommitTransaction(txn_id.value(), commit_timestamp_, &error_pb);
+ if (PREDICT_TRUE(s.ok())) {
+ return;
+ }
+ // It's again possible the FinalizeCommitTransaction call raced with
+ // BeginAbortTransaction, resulting in an aborted transaction. If so,
+ // schedule an abort.
+ if (ScheduleAbortIfNecessary()) {
+ return;
+ }
+ LOG(WARNING) << Substitute("Error writing to transaction status table: $0",
+ s.ToString());
+ }
+ tsm->RemoveCommitTask(txn_id.value(), this);
+ }));
+}
+
TxnStatusManagerBuildingVisitor::TxnStatusManagerBuildingVisitor()
: highest_txn_id_(kIdStatusDataReady) {
}
@@ -560,23 +670,37 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
"Unable to initialize TxnSystemClient");
}
- unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
unordered_map<int64_t, scoped_refptr<CommitTasks>> new_commits;
+ unordered_map<int64_t, scoped_refptr<CommitTasks>> new_finalizes;
unordered_map<int64_t, scoped_refptr<CommitTasks>> new_aborts;
if (txn_client) {
for (const auto& [txn_id, txn_entry] : txns_by_id) {
const auto& state = txn_entry->state();
- if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
- new_commits.emplace(txn_id,
- new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
- txn_client, commit_pool_, this));
- } else if (state == TxnStatePB::ABORT_IN_PROGRESS) {
- new_aborts.emplace(txn_id,
- new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
- txn_client, commit_pool_, this));
+ switch (state) {
+ case TxnStatePB::COMMIT_IN_PROGRESS:
+ new_commits.emplace(txn_id,
+ new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+ txn_client, commit_pool_, this));
+ break;
+ case TxnStatePB::FINALIZE_IN_PROGRESS: {
+ scoped_refptr<CommitTasks> tasks(
+ new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+ txn_client, commit_pool_, this));
+ tasks->set_commit_timestamp(Timestamp(txn_entry->commit_timestamp()));
+ new_finalizes.emplace(txn_id, std::move(tasks));
+ break;
+ }
+ case TxnStatePB::ABORT_IN_PROGRESS:
+ new_aborts.emplace(txn_id,
+ new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+ txn_client, commit_pool_, this));
+ break;
+ default:
+ break;
}
}
}
+ unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
{
std::lock_guard<simple_spinlock> l(lock_);
highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
@@ -592,6 +716,12 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
tasks->BeginCommitAsync();
}
}
+ if (!new_finalizes.empty()) {
+ LOG(INFO) << Substitute("Starting $0 finalize tasks", new_finalizes.size());
+ for (const auto& [_, tasks] : new_finalizes) {
+ tasks->FinalizeCommitAsync();
+ }
+ }
if (!new_aborts.empty()) {
LOG(INFO) << Substitute("Starting $0 aborts task", new_aborts.size());
for (const auto& [_, tasks] : new_aborts) {
@@ -883,9 +1013,9 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
void CommitTasks::BeginCommitAsync() {
if (participant_ids_.empty()) {
- // If there are no participants for this transaction; just write an invalid
- // timestamp.
- ScheduleFinalizeCommitWrite(Timestamp::kInvalidTimestamp);
+ // If there are no participants for this transaction; just change its state
+ // to committed.
+ ScheduleCompleteCommitWrite();
} else {
// If there are some participants, schedule beginning commit tasks so
// we can determine a finalized commit timestamp.
@@ -914,6 +1044,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& us
const auto& pb = txn_lock.data().pb;
const auto& state = pb.state();
if (state == TxnStatePB::COMMIT_IN_PROGRESS ||
+ state == TxnStatePB::FINALIZE_IN_PROGRESS ||
state == TxnStatePB::COMMITTED) {
return Status::OK();
}
@@ -953,7 +1084,8 @@ Status TxnStatusManager::FinalizeCommitTransaction(
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
const auto& pb = txn_lock.data().pb;
const auto& state = pb.state();
- if (state == TxnStatePB::COMMITTED) {
+ if (state == TxnStatePB::FINALIZE_IN_PROGRESS ||
+ state == TxnStatePB::COMMITTED) {
return Status::OK();
}
if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS)) {
@@ -963,9 +1095,50 @@ Status TxnStatusManager::FinalizeCommitTransaction(
ts_error);
}
auto* mutable_data = txn_lock.mutable_data();
- mutable_data->pb.set_state(TxnStatePB::COMMITTED);
+ mutable_data->pb.set_state(TxnStatePB::FINALIZE_IN_PROGRESS);
+ mutable_data->pb.set_commit_timestamp(commit_timestamp.value());
RETURN_NOT_OK(status_tablet_.UpdateTransaction(
txn_id, mutable_data->pb, ts_error));
+
+ if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ auto& task = FindOrDie(commits_in_flight_, txn_id);
+ task->FinalizeCommitAsync();
+ }
+
+ txn_lock.Commit();
+ return Status::OK();
+}
+
+Status TxnStatusManager::CompleteCommitTransaction(int64_t txn_id) {
+ leader_lock_.AssertAcquiredForReading();
+ scoped_refptr<TransactionEntry> txn;
+ TabletServerErrorPB ts_error;
+ RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn, &ts_error));
+
+ TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
+ const auto& pb = txn_lock.data().pb;
+ const auto& state = pb.state();
+ if (state == TxnStatePB::COMMITTED) {
+ return Status::OK();
+ }
+ if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS &&
+ state != TxnStatePB::FINALIZE_IN_PROGRESS)) {
+ return ReportIllegalTxnState(
+ Substitute("transaction ID $0 is not finalizing its commit: $1",
+ txn_id, SecureShortDebugString(pb)),
+ &ts_error);
+ }
+ auto* mutable_data = txn_lock.mutable_data();
+ mutable_data->pb.set_state(TxnStatePB::COMMITTED);
+ RETURN_NOT_OK(status_tablet_.UpdateTransaction(
+ txn_id, mutable_data->pb, &ts_error));
+
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ commits_in_flight_.erase(txn_id);
+ }
+
txn_lock.Commit();
return Status::OK();
}
@@ -995,9 +1168,9 @@ Status TxnStatusManager::FinalizeAbortTransaction(int64_t txn_id) {
return Status::OK();
}
-Status TxnStatusManager::AbortTransaction(int64_t txn_id,
- const string& user,
- TabletServerErrorPB* ts_error) {
+Status TxnStatusManager::BeginAbortTransaction(int64_t txn_id,
+ const boost::optional<string>& user,
+ TabletServerErrorPB* ts_error) {
leader_lock_.AssertAcquiredForReading();
TxnSystemClient* txn_client;
@@ -1010,12 +1183,28 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
const auto& pb = txn_lock.data().pb;
const auto& state = pb.state();
- if (state == TxnStatePB::ABORTED ||
- state == TxnStatePB::ABORT_IN_PROGRESS) {
+ if (state == TxnStatePB::ABORT_IN_PROGRESS) {
+ if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+ // It's possible we don't have any tasks running even though we're in
+ // ABORT_IN_PROGRESS, e.g. if we're in the middle of aborting a commit
+ // (and have removed the commit tasks), while at the same time, we've
+ // just served a client-initiated abort and so the state is already
+ // ABORT_IN_PROGRESS. If so, we should start abort tasks.
+ std::unique_lock<simple_spinlock> l(lock_);
+ if (PREDICT_FALSE(!ContainsKey(commits_in_flight_, txn_id))) {
+ auto participant_ids = txn->GetParticipantIds();
+ auto tasks = EmplaceOrDie(&commits_in_flight_, txn_id,
+ new CommitTasks(txn_id, std::move(participant_ids),
+ txn_client, commit_pool_, this));
+ l.unlock();
+ tasks->AbortTxnAsync();
+ }
+ }
+ return Status::OK();
+ }
+ if (state == TxnStatePB::ABORTED) {
return Status::OK();
}
- // TODO(awong): if we're in COMMIT_IN_PROGRESS, we should attempt to abort
- // any in-flight commit tasks.
if (PREDICT_FALSE(state != TxnStatePB::OPEN &&
state != TxnStatePB::COMMIT_IN_PROGRESS)) {
return ReportIllegalTxnState(
@@ -1035,13 +1224,24 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
txn_client, commit_pool_, this));
l.unlock();
if (emplaced) {
+ // If we didn't have commit tasks on-going, finalize the abort on
+ // participants.
map_iter->second->AbortTxnAsync();
+ } else {
+ // If we did have commit tasks, set them up so they finalize the abort.
+ map_iter->second->SetNeedsFinalizeAbort();
}
}
txn_lock.Commit();
return Status::OK();
}
+Status TxnStatusManager::AbortTransaction(int64_t txn_id,
+ const string& user,
+ TabletServerErrorPB* ts_error) {
+ return BeginAbortTransaction(txn_id, user, ts_error);
+}
+
Status TxnStatusManager::GetTransactionStatus(
int64_t txn_id,
const string& user,
@@ -1058,6 +1258,9 @@ Status TxnStatusManager::GetTransactionStatus(
txn_status->set_user(pb.user());
DCHECK(pb.has_state());
txn_status->set_state(pb.state());
+ if (pb.has_commit_timestamp()) {
+ txn_status->set_commit_timestamp(pb.commit_timestamp());
+ }
return Status::OK();
}
@@ -1076,22 +1279,22 @@ Status TxnStatusManager::KeepTransactionAlive(int64_t txn_id,
const auto& pb = txn_lock.data().pb;
const auto& state = pb.state();
- if (state != TxnStatePB::OPEN &&
- state != TxnStatePB::COMMIT_IN_PROGRESS) {
- return ReportIllegalTxnState(
- Substitute("transaction ID $0 is already in terminal state: $1",
- txn_id, SecureShortDebugString(pb)),
- ts_error);
- }
// Keepalive updates are not required for a transaction in COMMIT_IN_PROGRESS
// state. The system takes care of a transaction once the client side
// initiates the commit phase.
- if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
+ if (state == TxnStatePB::COMMIT_IN_PROGRESS ||
+ state == TxnStatePB::FINALIZE_IN_PROGRESS) {
return ReportIllegalTxnState(
Substitute("transaction ID $0 is in commit phase: $1",
txn_id, SecureShortDebugString(pb)),
ts_error);
}
+ if (state != TxnStatePB::OPEN) {
+ return ReportIllegalTxnState(
+ Substitute("transaction ID $0 is not available for further commits: $1",
+ txn_id, SecureShortDebugString(pb)),
+ ts_error);
+ }
DCHECK_EQ(TxnStatePB::OPEN, state);
txn->SetLastHeartbeatTime(MonoTime::Now());
diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h
index 48c4d87..17a2521 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -26,6 +26,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
#include <gtest/gtest_prod.h>
#include "kudu/common/timestamp.h"
@@ -33,6 +34,7 @@
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/transactions.pb.h"
#include "kudu/transactions/txn_status_entry.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/util/locks.h"
@@ -40,9 +42,9 @@
#include "kudu/util/status.h"
namespace kudu {
+class MonoDelta;
class ThreadPool;
-class MonoDelta;
namespace rpc {
class RpcContext;
} // namespace rpc
@@ -56,12 +58,25 @@ class TabletServerErrorPB;
} // namespace tserver
namespace transactions {
-class TxnStatusEntryPB;
class TxnStatusManager;
class TxnSystemClient;
class TxnSystemClientInitializer;
-// A handle for background tasks associated with a single transaction.
+// A handle for background tasks associated with a single transaction. The
+// following state changes are expected:
+//
+// BeginCommit FinalizeCommit CompleteCommit
+// OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED
+//
+// BeginCommit BeginAbort FinalizeAbort
+// OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED
+//
+// AbortTxn FinalizeAbort
+// OPEN --> ABORT_IN_PROGRESS --> ABORTED
+//
+// In each of these sequences, only the first state change is initiated by
+// clients -- subsequent changes are scheduled automatically by the reactor
+// threads.
class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
public:
CommitTasks(TxnId txn_id,
@@ -76,6 +91,8 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
txn_status_manager_(txn_status_manager),
ops_in_flight_(participant_ids_.size()),
begin_commit_timestamps_(participant_ids_.size(), Timestamp::kInvalidTimestamp),
+ commit_timestamp_(Timestamp::kInitialTimestamp),
+ abort_txn_(TxnStatePB::UNKNOWN),
stop_task_(false) {
}
@@ -94,12 +111,12 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
// Asynchronously sends a FINALIZE_COMMIT participant op to the each
// participant in the transaction, and upon completion, schedules a commit
// record to be written to the tablet.
- void FinalizeCommitAsyncTask(int participant_idx, const Timestamp& commit_timestamp);
+ void FinalizeCommitAsyncTask(int participant_idx);
// Asynchronously sends a FINALIZE_COMMIT participant op to the participant
// at the given index. If this was the last one to complete, schedules a
- // commit record with the given timestamp to be written to the tablet.
- void FinalizeCommitAsync(Timestamp commit_timestamp);
+ // commit record with 'commit_timestamp_' to be written to the tablet.
+ void FinalizeCommitAsync();
// Asynchronously sends a ABORT_TXN participant op to each participant in the
// transaction, and upon completion, schedules an abort record to be written
@@ -114,8 +131,10 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
// Schedule calls to the TxnStatusManager to be made on the commit pool.
// NOTE: these may be called on reactor threads and thus must not
// synchronously do any IO.
- void ScheduleFinalizeCommitWrite(Timestamp commit_timestamp);
- void ScheduleAbortTxnWrite();
+ void ScheduleFinalizeCommitWrite();
+ void ScheduleCompleteCommitWrite();
+ void ScheduleBeginAbortTxnWrite();
+ void ScheduleFinalizeAbortTxnWrite();
// Stops further tasks from being run. Once called calls to the above methods
// should effectively no-op.
@@ -123,6 +142,36 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
stop_task_ = true;
}
+ // Indicates to the on-going tasks that these tasks should focus on changing
+ // the state to ABORT_IN_PROGRESS and driving the rest of the abort.
+ //
+ // NOTE: this can be called from multiple tasks, and thus, abort_txn_ may
+ // already be set ABORT_IN_PROGRESS, or even ABORTED if there has been a
+ // client-initiated call to BeginAbortTransaction() while this commit was
+ // already in progress. In the latter case, this should no-op, since the
+ // transaction's state has already been set to ABORT_IN_PROGRESS.
+ void SetNeedsBeginAbort() {
+ auto expected_unknown_state = TxnStatePB::UNKNOWN;
+ abort_txn_.compare_exchange_strong(expected_unknown_state, TxnStatePB::ABORT_IN_PROGRESS);
+ }
+
+ // Indicates to the on-going tasks that these tasks should focus on sending
+ // out ABORT_TXN ops and changing the transaction state to ABORTED. Expected
+ // to be run after a ABORT_IN_PROGRESS record has been persisted to disk.
+ //
+ // NOTE: this can be called while the 'abort_txn_' is already
+ // ABORT_IN_PROGRESS if we're racing with a botched commit.
+ void SetNeedsFinalizeAbort() {
+ abort_txn_ = TxnStatePB::ABORTED;
+ }
+
+ // Sets the timestamp that this commit task should finalize across
+ // transaction participants.
+ void set_commit_timestamp(Timestamp commit_timestamp) {
+ DCHECK_EQ(Timestamp::kInitialTimestamp, commit_timestamp_);
+ commit_timestamp_ = commit_timestamp;
+ }
+
private:
friend class RefCountedThreadSafe<CommitTasks>;
~CommitTasks() = default;
@@ -144,6 +193,10 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
// shutdown.
bool IsShuttingDownCleanup() const;
+ // Returns true if the transaction has been aborted, and schedules abort
+ // mechanics to proceed.
+ bool ScheduleAbortIfNecessary();
+
// The ID of the transaction being committed.
const TxnId txn_id_;
@@ -168,6 +221,36 @@ class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
// The commit timestamp for this transaction.
Timestamp commit_timestamp_;
+ // Whether an on-going commit should abort instead.
+ // If ABORT_IN_PROGRESS, an on-going commit should call
+ // BeginAbortTransaction, indicating the commit failed an needs to be cleaned
+ // up. If ABORTED, an on-going commit should call FinalizeAbortTransaction,
+ // indicating the commit was interrupted by the user, and an
+ // ABORT_IN_PROGRESS record has already been written to the table.
+ //
+ // The following state changes are expected for 'abort_txn_':
+ // SetNeedsBeginAbort
+ // UNKNOWN --> ABORT_IN_PROGRESS
+ // - The commit tasks fail (e.g. because a participant was deleted) and one
+ // of the tasks calls SetNeedsBeginAbort(). The commit task should
+ // transition to writing an ABORT_IN_PROGRESS record and then driving
+ // aborts on participants.
+ //
+ // SetNeedsBeginAbort SetNeedsFinalizeAbort
+ // UNKNOWN --> ABORT_IN_PROGRESS --> ABORTED
+ // - The commit tasks fail and one of the tasks calls SetNeedsBeginAbort().
+ // Concurrently, a user calls AbortTransaction(), persisting a
+ // ABORT_IN_PROGRESS record and then calling SetNeedsFinalizeAbort() on the
+ // in-flight commit task. The tasks should then focus on driving aborts on
+ // participants.
+ //
+ // SetNeedsFinalizeAbort
+ // UNKNOWN --> ABORTED
+ // - A user calls AbortTransaction() without having attempting to abort
+ // otherwise. This will have written an ABORT_IN_PROGRESS record, and the
+ // tasks should focus on driving aborts on participants.
+ std::atomic<TxnStatePB> abort_txn_;
+
// Whether the task should stop executing, e.g. since an IllegalState error
// was observed on a participant, or because the TxnStatusManager changed
// leadership.
@@ -288,12 +371,23 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
Status FinalizeCommitTransaction(int64_t txn_id, Timestamp commit_timestamp,
tserver::TabletServerErrorPB* ts_error) override;
+ // Updates the state of the transaction to COMMITTED, returning an error if
+ // the transaction isn't in an appropriate state.
+ Status CompleteCommitTransaction(int64_t txn_id);
+
// Begins aborting the given transaction, returning an error if the
// transaction doesn't exist, is committed or not yet opened, or isn't owned
// by the given user.
Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
+ // Sets the state to ABORT_IN_PROGRESS and asynchronously sends ABORT_TXN ops
+ // to each participant in the transaction. This may not be directly called by
+ // a user -- as such, the 'user' field is optional.
+ Status BeginAbortTransaction(int64_t txn_id,
+ const boost::optional<std::string>& user,
+ tserver::TabletServerErrorPB* ts_error);
+
// Writes a record to the TxnStatusManager indicating the given transaction
// has been successfully aborted.
Status FinalizeAbortTransaction(int64_t txn_id);