You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/24 04:56:17 UTC
[kudu] branch master updated: KUDU-2612: rollback txn on
TXN_LOCKED_ABORT
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d21a0d3 KUDU-2612: rollback txn on TXN_LOCKED_ABORT
d21a0d3 is described below
commit d21a0d38373b84f66fa2d37732b7ec7b8fee5f16
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Apr 1 17:51:21 2021 -0700
KUDU-2612: rollback txn on TXN_LOCKED_ABORT
Previously it was required that clients manually rollback transactions
when their ops failed with TXN_LOCKED_ABORTED (seen as an Aborted
error). Rather than pushing the burden onto application users, this
patch attempts to alleviate this by hooking in a callback to rollback
transactions automatically if any write op fails with TXN_LOCKED_ABORT.
Of course, users are still free to Rollback() on error -- this patch is
simply meant to automate it in case the user doesn't, giving us a bit
more control over ensuring our transactions don't lead to a deadlock.
Change-Id: I25415cad0cfb08d260e23bd8b368852a5006c1fb
Reviewed-on: http://gerrit.cloudera.org:8080/17263
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/integration-tests/txn_write_ops-itest.cc | 39 ++++++++++++++---------
src/kudu/tserver/tablet_service.cc | 32 ++++++++++++++++---
src/kudu/tserver/ts_tablet_manager.cc | 21 ++++++++++++
src/kudu/tserver/ts_tablet_manager.h | 12 ++++++-
4 files changed, 84 insertions(+), 20 deletions(-)
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index c597acb..2bab38d 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -50,8 +50,8 @@
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/gutil/map-util.h"
@@ -297,9 +297,7 @@ class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
// Test that our deadlock prevention mechanisms work by writing across
// different tablets concurrently from multiple transactions.
-// TODO(awong): it'd be much more convenient to take control of aborting the
-// transactions ourselves, rather than relying on the application user.
-TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
+TEST_F(TxnWriteOpsITest, TestDeadlockPrevention) {
constexpr const int kNumTxns = 8;
const vector<string> master_flags = {
"--txn_manager_enabled=true",
@@ -338,8 +336,8 @@ TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
Status s = session->Apply(insert.release());
LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
token.txn_id(), row_key, s.ToString());
- // If the write op failed because of a locking error, roll the
- // transaction back and retry the transaction after waiting a bit.
+ // If the write op failed because of a locking error, retry the
+ // transaction after waiting a bit.
if (!s.ok()) {
vector<KuduError*> errors;
ElementDeleter d(&errors);
@@ -354,7 +352,6 @@ TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
// in wait-die, that get retried) will still time out, after
// contending a bit with other ops.
ASSERT_TRUE(error.IsAborted() || error.IsTimedOut()) << error.ToString();
- ASSERT_OK(txn->Rollback());
needs_retry = true;
// Wait a bit before retrying the entire transaction to allow for
@@ -1063,6 +1060,8 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
}
}
+// Test that when attempting to lock a transaction that is locked by an earlier
+// transaction, we abort the newer transaction.
TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
NO_FATALS(Prepare(1));
@@ -1118,21 +1117,31 @@ TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
ASSERT_STR_CONTAINS(row_status.ToString(), "should be aborted");
- // We should have an extra dispatcher for the new transactional write.
- ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+ // The dispatcher for the new transactional write should eventually
+ // disappear because the transaction is automatically aborted.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+ });
}
{
// Now, commit the first transaction.
ASSERT_OK(first_txn->Commit());
// All dispatchers should be unregistered once the transaction is committed.
- ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
-
- // Writes to the second transaction should now succeed.
- ASSERT_OK(InsertRows(second_txn.get(), 1, &key));
- ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+ ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
- ASSERT_OK(second_txn->Commit());
+ // The second transaction should have been automatically aborted in its
+ // attempt to write to avoid deadlock.
+ Status s = InsertRows(second_txn.get(), 1, &key);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete;
+ Status commit_status;
+ ASSERT_OK(second_txn->IsCommitComplete(&is_complete, &commit_status));
+ ASSERT_TRUE(is_complete);
+ ASSERT_TRUE(commit_status.IsAborted()) << commit_status.ToString();
+ });
ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
}
}
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 56a63c9..4cd699f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -681,7 +681,7 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
: context_(context),
response_(response) {}
- virtual void OpCompleted() OVERRIDE {
+ void OpCompleted() override {
if (!status_.ok()) {
LOG(WARNING) << Substitute("failed op from $0: $1",
context_->requestor_string(), status_.ToString());
@@ -700,6 +700,24 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
Response* response_;
};
+class TxnWriteCompletionCallback : public RpcOpCompletionCallback<WriteResponsePB> {
+ public:
+ TxnWriteCompletionCallback(RpcContext* context, WriteResponsePB* response,
+ std::function<Status(void)> abort_func)
+ : RpcOpCompletionCallback(context, response),
+ abort_func_(std::move(abort_func)) {}
+
+ void OpCompleted() override {
+ if (PREDICT_FALSE(code_ == TabletServerErrorPB::TXN_LOCKED_ABORT)) {
+ WARN_NOT_OK(abort_func_(), "Error running txn abort callback");
+ }
+ RpcOpCompletionCallback::OpCompleted();
+ }
+
+ private:
+ std::function<Status(void)> abort_func_;
+};
+
// Generic interface to handle scan results.
class ScanResultCollector {
public:
@@ -1612,17 +1630,23 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
}
- op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
- new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
-
const auto deadline = context->GetClientDeadline();
const auto& username = context->remote_user().username();
if (!req->has_txn_id() ||
PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
+ op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+ new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
+
// Submit the write operation. The RPC will be responded asynchronously.
s = replica->SubmitWrite(std::move(op_state));
} else {
+ auto abort_func = [this, txn_id = req->txn_id(), &username] {
+ return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username);
+ };
+ op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+ new TxnWriteCompletionCallback(context, resp, std::move(abort_func))));
+
// If it's a write operation in the context of a multi-row transaction,
// schedule running preliminary tasks if necessary: register the tablet as
// a participant in the transaction and begin transaction for the
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index d9b9d0f..3411f81 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1889,6 +1889,27 @@ Status TSTabletManager::SchedulePreliminaryTasksForTxnWrite(
});
}
+Status TSTabletManager::ScheduleAbortTxn(int64_t txn_id, const string& user) {
+ {
+ std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+ if (!InsertIfNotPresent(&txn_aborts_in_progress_, txn_id)) {
+ return Status::OK();
+ }
+ }
+ transactions::TxnSystemClient* tsc = nullptr;
+ RETURN_NOT_OK(server_->txn_client_initializer()->GetClient(&tsc));
+ DCHECK(tsc);
+ RETURN_NOT_OK(tsc->CheckOpenTxnStatusTable());
+ return txn_participant_registration_pool_->Submit(
+ [this, txn_id, tsc, user] () {
+ LOG(INFO) << Substitute("Sending abort request for transaction $0", txn_id);
+ WARN_NOT_OK(tsc->AbortTransaction(txn_id, user),
+ Substitute("Error aborting transaction $0 as user $1", txn_id, user));
+ std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+ txn_aborts_in_progress_.erase(txn_id);
+ });
+}
+
void TSTabletManager::SetNextUpdateTimeForTests() {
std::lock_guard<rw_spinlock> l(lock_update_);
next_update_time_ = MonoTime::Now();
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 1088da7..0508563 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -244,7 +244,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Update the tablet statistics if necessary.
void UpdateTabletStatsIfNecessary();
- // Schedule preliminary tasks to process
+ // Schedule preliminary tasks to begin transaction 'txn_id' started by 'user'
+ // with 'replica' as a participant, with the given deadline. Calls 'cb' if
+ // any of the tasks fail.
Status SchedulePreliminaryTasksForTxnWrite(
scoped_refptr<tablet::TabletReplica> replica,
int64_t txn_id,
@@ -252,6 +254,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
MonoTime deadline,
tablet::RegisteredTxnCallback cb);
+ // Schedule the rollback of the given transaction as the given user.
+ Status ScheduleAbortTxn(int64_t txn_id, const std::string& user);
+
private:
FRIEND_TEST(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange);
FRIEND_TEST(TsTabletManagerTest, TestPersistBlocks);
@@ -421,6 +426,11 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Holds cached tablet states from tablet_map_.
std::map<tablet::TabletStatePB, int> tablet_state_counts_;
+ // Set of transactions that have a pending call to abort, indicating that
+ // further attempts to schedule such a call can be ignored.
+ simple_spinlock txn_aborts_lock_;
+ std::unordered_set<int64_t> txn_aborts_in_progress_;
+
TSTabletManagerStatePB state_;
// Thread pool used to run tablet copy operations.