You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/24 04:56:17 UTC

[kudu] branch master updated: KUDU-2612: rollback txn on TXN_LOCKED_ABORT

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d21a0d3  KUDU-2612: rollback txn on TXN_LOCKED_ABORT
d21a0d3 is described below

commit d21a0d38373b84f66fa2d37732b7ec7b8fee5f16
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Apr 1 17:51:21 2021 -0700

    KUDU-2612: rollback txn on TXN_LOCKED_ABORT
    
    Previously it was required that clients manually rollback transactions
    when their ops failed with TXN_LOCKED_ABORTED (seen as an Aborted
    error). Rather than pushing the burden onto application users, this
    patch attempts to alleviate this by hooking in a callback to rollback
    transactions automatically if any write op fails with TXN_LOCKED_ABORT.
    
    Of course, users are still free to Rollback() on error -- this patch is
    simply meant to automate it in case the user doesn't, giving us a bit
    more control over ensuring our transactions don't lead to a deadlock.
    
    Change-Id: I25415cad0cfb08d260e23bd8b368852a5006c1fb
    Reviewed-on: http://gerrit.cloudera.org:8080/17263
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/integration-tests/txn_write_ops-itest.cc | 39 ++++++++++++++---------
 src/kudu/tserver/tablet_service.cc                | 32 ++++++++++++++++---
 src/kudu/tserver/ts_tablet_manager.cc             | 21 ++++++++++++
 src/kudu/tserver/ts_tablet_manager.h              | 12 ++++++-
 4 files changed, 84 insertions(+), 20 deletions(-)

diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index c597acb..2bab38d 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -50,8 +50,8 @@
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/gutil/map-util.h"
@@ -297,9 +297,7 @@ class TxnWriteOpsITest : public ExternalMiniClusterITestBase {
 
 // Test that our deadlock prevention mechanisms work by writing across
 // different tablets concurrently from multiple transactions.
-// TODO(awong): it'd be much more convenient to take control of aborting the
-// transactions ourselves, rather than relying on the application user.
-TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
+TEST_F(TxnWriteOpsITest, TestDeadlockPrevention) {
   constexpr const int kNumTxns = 8;
   const vector<string> master_flags = {
     "--txn_manager_enabled=true",
@@ -338,8 +336,8 @@ TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
           Status s = session->Apply(insert.release());
           LOG(INFO) << Substitute("Txn $0 wrote row $1: $2",
                                   token.txn_id(), row_key, s.ToString());
-          // If the write op failed because of a locking error, roll the
-          // transaction back and retry the transaction after waiting a bit.
+          // If the write op failed because of a locking error, retry the
+          // transaction after waiting a bit.
           if (!s.ok()) {
             vector<KuduError*> errors;
             ElementDeleter d(&errors);
@@ -354,7 +352,6 @@ TEST_F(TxnWriteOpsITest, TestClientSideDeadlockPrevention) {
             // in wait-die, that get retried) will still time out, after
             // contending a bit with other ops.
             ASSERT_TRUE(error.IsAborted() || error.IsTimedOut()) << error.ToString();
-            ASSERT_OK(txn->Rollback());
             needs_retry = true;
 
             // Wait a bit before retrying the entire transaction to allow for
@@ -1063,6 +1060,8 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
   }
 }
 
+// Test that when attempting to lock a transaction that is locked by an earlier
+// transaction, we abort the newer transaction.
 TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
   NO_FATALS(Prepare(1));
 
@@ -1118,21 +1117,31 @@ TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
     ASSERT_TRUE(row_status.IsAborted()) << row_status.ToString();
     ASSERT_STR_CONTAINS(row_status.ToString(), "should be aborted");
 
-    // We should have an extra dispatcher for the new transactional write.
-    ASSERT_EQ(1 + kNumPartitions, GetTxnOpDispatchersTotalCount());
+    // The dispatcher for the new transactional write should eventually
+    // disappear because the transaction is automatically aborted.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(kNumPartitions, GetTxnOpDispatchersTotalCount());
+    });
   }
   {
     // Now, commit the first transaction.
     ASSERT_OK(first_txn->Commit());
 
     // All dispatchers should be unregistered once the transaction is committed.
-    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
-
-    // Writes to the second transaction should now succeed.
-    ASSERT_OK(InsertRows(second_txn.get(), 1, &key));
-    ASSERT_EQ(1, GetTxnOpDispatchersTotalCount());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
 
-    ASSERT_OK(second_txn->Commit());
+    // The second transaction should have been automatically aborted in its
+    // attempt to write to avoid deadlock.
+    Status s = InsertRows(second_txn.get(), 1, &key);
+    ASSERT_TRUE(s.IsIOError());
+    ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
+    ASSERT_EVENTUALLY([&] {
+      bool is_complete;
+      Status commit_status;
+      ASSERT_OK(second_txn->IsCommitComplete(&is_complete, &commit_status));
+      ASSERT_TRUE(is_complete);
+      ASSERT_TRUE(commit_status.IsAborted()) << commit_status.ToString();
+    });
     ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
   }
 }
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 56a63c9..4cd699f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -681,7 +681,7 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
       : context_(context),
         response_(response) {}
 
-  virtual void OpCompleted() OVERRIDE {
+  void OpCompleted() override {
     if (!status_.ok()) {
       LOG(WARNING) << Substitute("failed op from $0: $1",
                                  context_->requestor_string(), status_.ToString());
@@ -700,6 +700,24 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
   Response* response_;
 };
 
+class TxnWriteCompletionCallback : public RpcOpCompletionCallback<WriteResponsePB> {
+ public:
+  TxnWriteCompletionCallback(RpcContext* context, WriteResponsePB* response,
+                             std::function<Status(void)> abort_func)
+      : RpcOpCompletionCallback(context, response),
+        abort_func_(std::move(abort_func)) {}
+
+  void OpCompleted() override {
+    if (PREDICT_FALSE(code_ == TabletServerErrorPB::TXN_LOCKED_ABORT)) {
+      WARN_NOT_OK(abort_func_(), "Error running txn abort callback");
+    }
+    RpcOpCompletionCallback::OpCompleted();
+  }
+
+ private:
+  std::function<Status(void)> abort_func_;
+};
+
 // Generic interface to handle scan results.
 class ScanResultCollector {
  public:
@@ -1612,17 +1630,23 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
         resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
   }
 
-  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
-      new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
-
   const auto deadline = context->GetClientDeadline();
   const auto& username = context->remote_user().username();
 
   if (!req->has_txn_id() ||
       PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
+    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+        new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));
+
     // Submit the write operation. The RPC will be responded asynchronously.
     s = replica->SubmitWrite(std::move(op_state));
   } else {
+    auto abort_func = [this, txn_id = req->txn_id(), &username] {
+      return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username);
+    };
+    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+        new TxnWriteCompletionCallback(context, resp, std::move(abort_func))));
+
     // If it's a write operation in the context of a multi-row transaction,
     // schedule running preliminary tasks if necessary: register the tablet as
     // a participant in the transaction and begin transaction for the
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index d9b9d0f..3411f81 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1889,6 +1889,27 @@ Status TSTabletManager::SchedulePreliminaryTasksForTxnWrite(
   });
 }
 
+Status TSTabletManager::ScheduleAbortTxn(int64_t txn_id, const string& user) {
+  {
+    std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+    if (!InsertIfNotPresent(&txn_aborts_in_progress_, txn_id)) {
+      return Status::OK();
+    }
+  }
+  transactions::TxnSystemClient* tsc = nullptr;
+  RETURN_NOT_OK(server_->txn_client_initializer()->GetClient(&tsc));
+  DCHECK(tsc);
+  RETURN_NOT_OK(tsc->CheckOpenTxnStatusTable());
+  return txn_participant_registration_pool_->Submit(
+      [this, txn_id, tsc, user] () {
+        LOG(INFO) << Substitute("Sending abort request for transaction $0", txn_id);
+        WARN_NOT_OK(tsc->AbortTransaction(txn_id, user),
+                    Substitute("Error aborting transaction $0 as user $1", txn_id, user));
+        std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+        txn_aborts_in_progress_.erase(txn_id);
+      });
+}
+
 void TSTabletManager::SetNextUpdateTimeForTests() {
   std::lock_guard<rw_spinlock> l(lock_update_);
   next_update_time_ = MonoTime::Now();
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 1088da7..0508563 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -244,7 +244,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Update the tablet statistics if necessary.
   void UpdateTabletStatsIfNecessary();
 
-  // Schedule preliminary tasks to process
+  // Schedule preliminary tasks to begin transaction 'txn_id' started by 'user'
+  // with 'replica' as a participant, with the given deadline. Calls 'cb' if
+  // any of the tasks fail.
   Status SchedulePreliminaryTasksForTxnWrite(
       scoped_refptr<tablet::TabletReplica> replica,
       int64_t txn_id,
@@ -252,6 +254,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       MonoTime deadline,
       tablet::RegisteredTxnCallback cb);
 
+  // Schedule the rollback of the given transaction as the given user.
+  Status ScheduleAbortTxn(int64_t txn_id, const std::string& user);
+
  private:
   FRIEND_TEST(LeadershipChangeReportingTest, TestReportStatsDuringLeadershipChange);
   FRIEND_TEST(TsTabletManagerTest, TestPersistBlocks);
@@ -421,6 +426,11 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Holds cached tablet states from tablet_map_.
   std::map<tablet::TabletStatePB, int> tablet_state_counts_;
 
+  // Set of transactions that have a pending call to abort, indicating that
+  // further attempts to schedule such a call can be ignored.
+  simple_spinlock txn_aborts_lock_;
+  std::unordered_set<int64_t> txn_aborts_in_progress_;
+
   TSTabletManagerStatePB state_;
 
   // Thread pool used to run tablet copy operations.