You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/28 22:47:00 UTC
[kudu] branch master updated: KUDU-2612: retry abort task on timeout
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 73310d9 KUDU-2612: retry abort task on timeout
73310d9 is described below
commit 73310d955b4d5d7ec413217d99517cc4b7794975
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Apr 28 13:59:58 2021 -0700
KUDU-2612: retry abort task on timeout
It's possible that in the event of a node failure, the abort that
happens automatically to avoid a deadlock will timeout. Instead of
leaving the failure as is, this patch updates the task to be retried.
To expedite testing, this also introduces a new flag to the
TxnSystemClient to reduce the default timeout.
This is a follow-up to d21a0d3.
Change-Id: I303a9a8c85a2191594a22d907770a82da5060f19
Reviewed-on: http://gerrit.cloudera.org:8080/17357
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
---
src/kudu/integration-tests/txn_write_ops-itest.cc | 77 ++++++++++++++++++++++-
src/kudu/transactions/txn_system_client.cc | 24 +++++++
src/kudu/transactions/txn_system_client.h | 12 ++--
src/kudu/tserver/ts_tablet_manager.cc | 14 ++++-
4 files changed, 116 insertions(+), 11 deletions(-)
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 7554b78..b53c796 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -140,8 +140,10 @@ DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_int32(txn_participant_begin_op_inject_latency_ms);
DECLARE_int32(txn_participant_registration_inject_latency_ms);
+DECLARE_int64(txn_system_client_op_timeout_ms);
DECLARE_uint32(tablet_max_pending_txn_write_ops);
DECLARE_uint32(txn_manager_status_table_num_replicas);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
namespace kudu {
@@ -930,10 +932,13 @@ class TxnOpDispatcherITest : public KuduTest {
CHECK_OK(BuildSchema(&schema_));
}
- void Prepare(int num_tservers) {
+ void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
+ if (num_replicas == 0) {
+ num_replicas = num_tservers;
+ }
FLAGS_txn_manager_enabled = true;
FLAGS_txn_manager_lazily_initialized = false;
- FLAGS_txn_manager_status_table_num_replicas = num_tservers;
+ FLAGS_txn_manager_status_table_num_replicas = num_replicas;
InternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tservers;
@@ -943,7 +948,9 @@ class TxnOpDispatcherITest : public KuduTest {
KuduClientBuilder builder;
builder.default_admin_operation_timeout(kTimeout);
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
- ASSERT_OK(CreateTable(num_tservers));
+ if (create_table) {
+ ASSERT_OK(CreateTable(num_replicas));
+ }
for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
auto* ts = cluster_->mini_tablet_server(i);
ASSERT_OK(ts->WaitStarted());
@@ -1163,6 +1170,70 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
}
}
+// Test that the automatic abort to avoid deadlock gets retried if the op times
+// out.
+TEST_F(TxnOpDispatcherITest, TestRetryWaitDieAbortsWhenTServerUnavailable) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // Disable the staleness tracker so we know any aborts were done by the
+ // wait-die deadlock prevention.
+ FLAGS_txn_staleness_tracker_interval_ms = 0;
+ // Set a low system client timeout to make sure our abort task retries.
+ FLAGS_txn_system_client_op_timeout_ms = 1000;
+
+ NO_FATALS(Prepare(/*num_tservers*/2, /*create_table*/false, /*num_replicas*/1));
+ // First, figure out which tablet server hosts the TxnStatusManager.
+ tserver::MiniTabletServer* tsm_server = nullptr;
+ ASSERT_EVENTUALLY([&] {
+ for (int i = 0; i < cluster_->num_tablet_servers() && tsm_server == nullptr; i++) {
+ auto* mts = cluster_->mini_tablet_server(i);
+ auto* tablet_manager = mts->server()->tablet_manager();
+ vector<scoped_refptr<TabletReplica>> replicas;
+ tablet_manager->GetTabletReplicas(&replicas);
+ if (!replicas.empty()) {
+ tsm_server = mts;
+ }
+ }
+ ASSERT_FALSE(tsm_server == nullptr);
+ });
+
+ // Create a single-tablet table so shutting down the TxnStatusManager doesn't
+ // affect writes.
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(kTableName)
+ .schema(&schema_)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(1)
+ .Create());
+ ASSERT_OK(client_->OpenTable(kTableName, &table_));
+ shared_ptr<KuduTransaction> first_txn;
+ shared_ptr<KuduTransaction> second_txn;
+ ASSERT_OK(client_->NewTransaction(&first_txn));
+ ASSERT_OK(client_->NewTransaction(&second_txn));
+
+ int64_t key = 0;
+ ASSERT_OK(InsertRows(first_txn.get(), 1, &key));
+
+ // The second transaction should always fail because it's attempting to lock
+ // a tablet that's already locked.
+ Status s = InsertRows(second_txn.get(), 1, &key);
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+ // Immediately shutdown, reducing the likelihood that the automatic abort
+ // task will complete. Then sleep for long enough that the system client
+ // would timeout and try again.
+ tsm_server->Shutdown();
+ SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_txn_system_client_op_timeout_ms));
+ ASSERT_OK(tsm_server->Restart());
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete = false;
+ Status completion_status;
+ ASSERT_OK(second_txn->IsCommitComplete(&is_complete, &completion_status));
+ ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+ ASSERT_TRUE(is_complete);
+ });
+}
+
// Test that when attempting to lock a transaction that is locked by an earlier
// transaction, we abort the newer transaction.
TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 927b1ed..83e9ead 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -61,6 +61,12 @@ DEFINE_bool(disable_txn_system_client_init, false,
"client connections.");
TAG_FLAG(disable_txn_system_client_init, unsafe);
+DEFINE_int64(txn_system_client_op_timeout_ms, 10 * 1000,
+ "Op timeout used by the TxnSystemClient when making transactions-related "
+ "RPCs to the TxnStatusManager.");
+TAG_FLAG(txn_system_client_op_timeout_ms, advanced);
+TAG_FLAG(txn_system_client_op_timeout_ms, runtime);
+
DECLARE_int64(rpc_negotiation_timeout_ms);
using kudu::client::KuduClient;
@@ -185,6 +191,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
uint32_t* txn_keepalive_ms,
int64_t* highest_seen_txn_id,
MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN);
coordinate_txn_op.set_txn_id(txn_id);
@@ -213,6 +222,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& participant_id,
const string& user, MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::REGISTER_PARTICIPANT);
coordinate_txn_op.set_txn_id(txn_id);
@@ -228,6 +240,9 @@ Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& partic
Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
const string& user,
MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN);
coordinate_txn_op.set_txn_id(txn_id);
@@ -242,6 +257,9 @@ Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
Status TxnSystemClient::AbortTransaction(int64_t txn_id,
const string& user,
MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN);
coordinate_txn_op.set_txn_id(txn_id);
@@ -257,6 +275,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
const string& user,
TxnStatusEntryPB* txn_status,
MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
DCHECK(txn_status);
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::GET_TXN_STATUS);
@@ -284,6 +305,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,
const string& user,
MonoDelta timeout) {
+ if (!timeout.Initialized()) {
+ timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
+ }
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE);
coordinate_txn_op.set_txn_id(txn_id);
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index 0c5bc3e..6b4c193 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -102,24 +102,24 @@ class TxnSystemClient {
std::string& user,
uint32_t* txn_keepalive_ms = nullptr,
int64_t* highest_seen_txn_id = nullptr,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Attempts to register the given participant with the given transaction.
// Returns an error if the transaction hasn't yet been started, or if the
// 'user' isn't permitted to modify the transaction.
Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
const std::string& user,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Initiates committing a transaction with the given identifier.
Status BeginCommitTransaction(int64_t txn_id,
const std::string& user,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Aborts a transaction with the given identifier.
Status AbortTransaction(int64_t txn_id,
const std::string& user,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Retrieves transactions status. On success, returns Status::OK() and stores
// the result status in the 'txn_status' output parameter. On failure,
@@ -127,12 +127,12 @@ class TxnSystemClient {
Status GetTransactionStatus(int64_t txn_id,
const std::string& user,
TxnStatusEntryPB* txn_status,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Send keep-alive heartbeat for the specified transaction as the given user.
Status KeepTransactionAlive(int64_t txn_id,
const std::string& user,
- MonoDelta timeout = MonoDelta::FromSeconds(10));
+ MonoDelta timeout = MonoDelta());
// Opens the transaction status table, refreshing metadata with that from the
// masters.
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3411f81..e6bd634 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1903,8 +1903,18 @@ Status TSTabletManager::ScheduleAbortTxn(int64_t txn_id, const string& user) {
return txn_participant_registration_pool_->Submit(
[this, txn_id, tsc, user] () {
LOG(INFO) << Substitute("Sending abort request for transaction $0", txn_id);
- WARN_NOT_OK(tsc->AbortTransaction(txn_id, user),
- Substitute("Error aborting transaction $0 as user $1", txn_id, user));
+ auto s = tsc->AbortTransaction(txn_id, user);
+ if (s.IsTimedOut()) {
+ // Presumably this was a transient error. Try again.
+ {
+ std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
+ txn_aborts_in_progress_.erase(txn_id);
+ }
+ WARN_NOT_OK(ScheduleAbortTxn(txn_id, user),
+ Substitute("Could not reschedule abort of transaction $0", txn_id));
+ return;
+ }
+ WARN_NOT_OK(s, Substitute("Error aborting transaction $0 as user $1", txn_id, user));
std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
txn_aborts_in_progress_.erase(txn_id);
});