You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/28 22:47:00 UTC

[kudu] branch master updated: KUDU-2612: retry abort task on timeout

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 73310d9  KUDU-2612: retry abort task on timeout
73310d9 is described below

commit 73310d955b4d5d7ec413217d99517cc4b7794975
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Apr 28 13:59:58 2021 -0700

    KUDU-2612: retry abort task on timeout
    
    It's possible that in the event of a node failure, the abort that
    happens automatically to avoid a deadlock will timeout. Instead of
    leaving the failure as is, this patch updates the task to be retried.
    
    To expedite testing, this also introduces a new flag to the
    TxnSystemClient to reduce the default timeout.
    
    This is a follow-up to d21a0d3.
    
    Change-Id: I303a9a8c85a2191594a22d907770a82da5060f19
    Reviewed-on: http://gerrit.cloudera.org:8080/17357
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/integration-tests/txn_write_ops-itest.cc | 77 ++++++++++++++++++++++-
 src/kudu/transactions/txn_system_client.cc        | 24 +++++++
 src/kudu/transactions/txn_system_client.h         | 12 ++--
 src/kudu/tserver/ts_tablet_manager.cc             | 14 ++++-
 4 files changed, 116 insertions(+), 11 deletions(-)

diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 7554b78..b53c796 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -140,8 +140,10 @@ DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(txn_participant_begin_op_inject_latency_ms);
 DECLARE_int32(txn_participant_registration_inject_latency_ms);
+DECLARE_int64(txn_system_client_op_timeout_ms);
 DECLARE_uint32(tablet_max_pending_txn_write_ops);
 DECLARE_uint32(txn_manager_status_table_num_replicas);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
 
 namespace kudu {
 
@@ -930,10 +932,13 @@ class TxnOpDispatcherITest : public KuduTest {
     CHECK_OK(BuildSchema(&schema_));
   }
 
-  void Prepare(int num_tservers) {
+  void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+    if (num_replicas == 0) {
+      num_replicas = num_tservers;
+    }
     FLAGS_txn_manager_enabled = true;
     FLAGS_txn_manager_lazily_initialized = false;
-    FLAGS_txn_manager_status_table_num_replicas = num_tservers;
+    FLAGS_txn_manager_status_table_num_replicas = num_replicas;
 
     InternalMiniClusterOptions opts;
     opts.num_tablet_servers = num_tservers;
@@ -943,7 +948,9 @@ class TxnOpDispatcherITest : public KuduTest {
     KuduClientBuilder builder;
     builder.default_admin_operation_timeout(kTimeout);
     ASSERT_OK(cluster_->CreateClient(&builder, &client_));
-    ASSERT_OK(CreateTable(num_tservers));
+    if (create_table) {
+      ASSERT_OK(CreateTable(num_replicas));
+    }
     for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
       auto* ts = cluster_->mini_tablet_server(i);
       ASSERT_OK(ts->WaitStarted());
@@ -1163,6 +1170,70 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
   }
 }
 
+// Test that the automatic abort to avoid deadlock gets retried if the op times
+// out.
+TEST_F(TxnOpDispatcherITest, TestRetryWaitDieAbortsWhenTServerUnavailable) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Disable the staleness tracker so we know any aborts were done by the
+  // wait-die deadlock prevention.
+  FLAGS_txn_staleness_tracker_interval_ms = 0;
+  // Set a low system client timeout to make sure our abort task retries.
+  FLAGS_txn_system_client_op_timeout_ms = 1000;
+
+  NO_FATALS(Prepare(/*num_tservers*/2, /*create_table*/false, /*num_replicas*/1));
+  // First, figure out which tablet server hosts the TxnStatusManager.
+  tserver::MiniTabletServer* tsm_server = nullptr;
+  ASSERT_EVENTUALLY([&] {
+    for (int i = 0; i < cluster_->num_tablet_servers() && tsm_server == nullptr; i++) {
+      auto* mts = cluster_->mini_tablet_server(i);
+      auto* tablet_manager = mts->server()->tablet_manager();
+      vector<scoped_refptr<TabletReplica>> replicas;
+      tablet_manager->GetTabletReplicas(&replicas);
+      if (!replicas.empty()) {
+        tsm_server = mts;
+      }
+    }
+    ASSERT_FALSE(tsm_server == nullptr);
+  });
+
+  // Create a single-tablet table so shutting down the TxnStatusManager doesn't
+  // affect writes.
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema_)
+            .set_range_partition_columns({ "key" })
+            .num_replicas(1)
+            .Create());
+  ASSERT_OK(client_->OpenTable(kTableName, &table_));
+  shared_ptr<KuduTransaction> first_txn;
+  shared_ptr<KuduTransaction> second_txn;
+  ASSERT_OK(client_->NewTransaction(&first_txn));
+  ASSERT_OK(client_->NewTransaction(&second_txn));
+
+  int64_t key = 0;
+  ASSERT_OK(InsertRows(first_txn.get(), 1, &key));
+
+  // The second transaction should always fail because it's attempting to lock
+  // a tablet that's already locked.
+  Status s = InsertRows(second_txn.get(), 1, &key);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+  // Immediately shutdown, reducing the likelihood that the automatic abort
+  // task will complete. Then sleep for long enough that the system client
+  // would timeout and try again.
+  tsm_server->Shutdown();
+  SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_txn_system_client_op_timeout_ms));
+  ASSERT_OK(tsm_server->Restart());
+  ASSERT_EVENTUALLY([&] {
+    bool is_complete = false;
+    Status completion_status;
+    ASSERT_OK(second_txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+    ASSERT_TRUE(is_complete);
+  });
+}
+
 // Test that when attempting to lock a transaction that is locked by an earlier
 // transaction, we abort the newer transaction.
 TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 927b1ed..83e9ead 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -61,6 +61,12 @@ DEFINE_bool(disable_txn_system_client_init, false,
             "client connections.");
 TAG_FLAG(disable_txn_system_client_init, unsafe);
 
+DEFINE_int64(txn_system_client_op_timeout_ms, 10 * 1000,
+             "Op timeout used by the TxnSystemClient when making transactions-related "
+             "RPCs to the TxnStatusManager.");
+TAG_FLAG(txn_system_client_op_timeout_ms, advanced);
+TAG_FLAG(txn_system_client_op_timeout_ms, runtime);
+
 DECLARE_int64(rpc_negotiation_timeout_ms);
 
 using kudu::client::KuduClient;
@@ -185,6 +191,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                          uint32_t* txn_keepalive_ms,
                                          int64_t* highest_seen_txn_id,
                                          MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN);
   coordinate_txn_op.set_txn_id(txn_id);
@@ -213,6 +222,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
 
 Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& participant_id,
                                             const string& user, MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::REGISTER_PARTICIPANT);
   coordinate_txn_op.set_txn_id(txn_id);
@@ -228,6 +240,9 @@ Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& partic
 Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
                                                const string& user,
                                                MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN);
   coordinate_txn_op.set_txn_id(txn_id);
@@ -242,6 +257,9 @@ Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
 Status TxnSystemClient::AbortTransaction(int64_t txn_id,
                                          const string& user,
                                          MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN);
   coordinate_txn_op.set_txn_id(txn_id);
@@ -257,6 +275,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
                                              const string& user,
                                              TxnStatusEntryPB* txn_status,
                                              MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   DCHECK(txn_status);
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::GET_TXN_STATUS);
@@ -284,6 +305,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
 Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,
                                              const string& user,
                                              MonoDelta timeout) {
+  if (!timeout.Initialized()) {
+    timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+  }
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE);
   coordinate_txn_op.set_txn_id(txn_id);
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index 0c5bc3e..6b4c193 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -102,24 +102,24 @@ class TxnSystemClient {
                           std::string& user,
                           uint32_t* txn_keepalive_ms = nullptr,
                           int64_t* highest_seen_txn_id = nullptr,
-                          MonoDelta timeout = MonoDelta::FromSeconds(10));
+                          MonoDelta timeout = MonoDelta());
 
   // Attempts to register the given participant with the given transaction.
   // Returns an error if the transaction hasn't yet been started, or if the
   // 'user' isn't permitted to modify the transaction.
   Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
                              const std::string& user,
-                             MonoDelta timeout = MonoDelta::FromSeconds(10));
+                             MonoDelta timeout = MonoDelta());
 
   // Initiates committing a transaction with the given identifier.
   Status BeginCommitTransaction(int64_t txn_id,
                                 const std::string& user,
-                                MonoDelta timeout = MonoDelta::FromSeconds(10));
+                                MonoDelta timeout = MonoDelta());
 
   // Aborts a transaction with the given identifier.
   Status AbortTransaction(int64_t txn_id,
                           const std::string& user,
-                          MonoDelta timeout = MonoDelta::FromSeconds(10));
+                          MonoDelta timeout = MonoDelta());
 
   // Retrieves transactions status. On success, returns Status::OK() and stores
   // the result status in the 'txn_status' output parameter. On failure,
@@ -127,12 +127,12 @@ class TxnSystemClient {
   Status GetTransactionStatus(int64_t txn_id,
                               const std::string& user,
                               TxnStatusEntryPB* txn_status,
-                              MonoDelta timeout = MonoDelta::FromSeconds(10));
+                              MonoDelta timeout = MonoDelta());
 
   // Send keep-alive heartbeat for the specified transaction as the given user.
   Status KeepTransactionAlive(int64_t txn_id,
                               const std::string& user,
-                              MonoDelta timeout = MonoDelta::FromSeconds(10));
+                              MonoDelta timeout = MonoDelta());
 
   // Opens the transaction status table, refreshing metadata with that from the
   // masters.
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3411f81..e6bd634 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1903,8 +1903,18 @@ Status TSTabletManager::ScheduleAbortTxn(int64_t txn_id, const string& user) {
   return txn_participant_registration_pool_->Submit(
       [this, txn_id, tsc, user] () {
         LOG(INFO) << Substitute("Sending abort request for transaction $0", txn_id);
-        WARN_NOT_OK(tsc->AbortTransaction(txn_id, user),
-                    Substitute("Error aborting transaction $0 as user $1", txn_id, user));
+        auto s = tsc->AbortTransaction(txn_id, user);
+        if (s.IsTimedOut()) {
+          // Presumably this was a transient error. Try again.
+          {
+            std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+            txn_aborts_in_progress_.erase(txn_id);
+          }
+          WARN_NOT_OK(ScheduleAbortTxn(txn_id, user),
+              Substitute("Could not reschedule abort of transaction $0", txn_id));
+          return;
+        }
+        WARN_NOT_OK(s, Substitute("Error aborting transaction $0 as user $1", txn_id, user));
         std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
         txn_aborts_in_progress_.erase(txn_id);
       });