You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/11/18 19:44:11 UTC
[kudu] 02/02: KUDU-2612 move keep-alive assignment to
TxnStatusManager
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 2d0631fe289968094c9dc6a7b16fcc6df9a269ba
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Nov 16 22:52:04 2020 -0800
KUDU-2612 move keep-alive assignment to TxnStatusManager
To prepare for the processing of transactions' keep-alive heartbeats
and tracking of stale/abandoned transactions, the assignment of the
keep-alive setting for a newly started transaction has moved from
TxnManager to TxnStatusManager.
The rationale for this change is the fact that TxnStatusManager will
have a periodic task to track the staleness of open transactions
registered with it. At this point we anticipate the keep-alive setting
to be a system-wide parameter, not a per-transaction one. With that,
it's natural to make TxnStatusManager the single source of truth for
the transaction keep-alive interval, such that the latter to be used
to assign the corresponding property for newly created transactions
and to spot stale/abandoned ones.
An alternative would be keeping TxnManager as the source of truth
for the transaction keep-alive setting, but that would entail storing
persistently the value assigned by TxnManager in a per-transaction
basis, which is hard to justify without a particular use case.
This patch also brings in a few test scenarios and updates the relevant
existing ones.
Change-Id: Ie98688b2bbe4c673abbfc5801f89eb8182003c18
Reviewed-on: http://gerrit.cloudera.org:8080/16737
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../integration-tests/txn_status_table-itest.cc | 35 ++++++++++++++++------
src/kudu/master/txn_manager-test.cc | 6 ++--
src/kudu/master/txn_manager.cc | 19 +++++-------
src/kudu/master/txn_manager.h | 2 +-
src/kudu/master/txn_manager.proto | 3 +-
src/kudu/master/txn_manager_service.cc | 6 ++--
src/kudu/transactions/txn_status_manager.cc | 9 ++++++
src/kudu/transactions/txn_system_client.cc | 10 +++++++
src/kudu/transactions/txn_system_client.h | 5 +++-
src/kudu/tserver/tablet_service.cc | 4 +++
src/kudu/tserver/tserver_admin.proto | 8 +++++
11 files changed, 78 insertions(+), 29 deletions(-)
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index 37ad5b8..edeb4bb 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -66,6 +66,7 @@
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
DECLARE_string(superuser_acl);
DECLARE_string(user_acl);
+DECLARE_uint32(transaction_keepalive_interval_ms);
using kudu::client::AuthenticationCredentialsPB;
using kudu::client::KuduClient;
@@ -353,16 +354,23 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) {
TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
- ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+ uint32_t txn_keepalive_ms;
+ ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms));
+ ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
// If we write out of range, we should see an error.
{
int64_t highest_seen_txn_id = -1;
- auto s = txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id);
+ uint32_t txn_keepalive_ms = 0;
+ auto s = txn_sys_client_->BeginTransaction(
+ 100, kUser, &txn_keepalive_ms, &highest_seen_txn_id);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
// The 'highest_seen_txn_id' should be left untouched.
ASSERT_EQ(-1, highest_seen_txn_id);
+ // txn_keepalive_ms isn't assigned in case of non-OK status.
+ ASSERT_EQ(0, txn_keepalive_ms);
+ ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms);
}
{
auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -376,7 +384,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
// Once we add a new range, we should be able to leverage it.
ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
int64_t highest_seen_txn_id = -1;
- ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id));
+ ASSERT_OK(txn_sys_client_->BeginTransaction(
+ 100, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id));
ASSERT_EQ(100, highest_seen_txn_id);
ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
@@ -393,7 +402,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
{
int64_t highest_seen_txn_id = -1;
auto s = txn_sys_client_->BeginTransaction(
- 1, kUser, &highest_seen_txn_id, MonoDelta::FromMilliseconds(100));
+ 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+ MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// The 'highest_seen_txn_id' should be left untouched.
ASSERT_EQ(-1, highest_seen_txn_id);
@@ -413,7 +423,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
int64_t highest_seen_txn_id = -1;
ASSERT_OK(txn_sys_client_->BeginTransaction(
- 1, kUser, &highest_seen_txn_id, MonoDelta::FromSeconds(3)));
+ 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+ MonoDelta::FromSeconds(3)));
ASSERT_EQ(highest_seen_txn_id, 1);
}
@@ -421,13 +432,17 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
int64_t highest_seen_txn_id = -1;
- ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id));
+ uint32_t txn_keepalive_ms;
+ ASSERT_OK(txn_sys_client_->BeginTransaction(
+ 1, kUser, &txn_keepalive_ms, &highest_seen_txn_id));
ASSERT_EQ(1, highest_seen_txn_id);
+ ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
// Trying to start another transaction with a used ID should yield an error.
{
int64_t highest_seen_txn_id = -1;
- auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+ auto s = txn_sys_client_->BeginTransaction(
+ 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_EQ(1, highest_seen_txn_id);
ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -436,7 +451,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
// The same should be true with a different user.
{
int64_t highest_seen_txn_id = -1;
- auto s = txn_sys_client_->BeginTransaction(1, "stranger", &highest_seen_txn_id);
+ auto s = txn_sys_client_->BeginTransaction(
+ 1, "stranger", nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_EQ(1, highest_seen_txn_id);
ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -478,7 +494,8 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
// with already used ID should yield an error.
{
int64_t highest_seen_txn_id = -1;
- auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+ auto s = txn_sys_client_->BeginTransaction(
+ 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
ASSERT_EQ(1, highest_seen_txn_id);
ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index 47d5fce..034ae89 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -61,7 +61,7 @@ DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_int32(rpc_service_queue_length);
DECLARE_int64(txn_manager_status_table_range_partition_span);
-DECLARE_uint32(transaction_keep_alive_interval_ms);
+DECLARE_uint32(transaction_keepalive_interval_ms);
namespace kudu {
namespace transactions {
@@ -320,7 +320,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
txn_id = resp.txn_id();
ASSERT_LE(0, txn_id);
ASSERT_TRUE(resp.has_keepalive_millis());
- ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+ ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
TxnStatePB txn_state;
NO_FATALS(fetch_txn_status(txn_id, &txn_state));
ASSERT_EQ(TxnStatePB::OPEN, txn_state);
@@ -400,7 +400,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
txn_ids->emplace_back(txn_id);
}
CHECK(resp.has_keepalive_millis());
- CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+ CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
}
};
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index aee7ce6..f67d942 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -73,13 +73,6 @@ DEFINE_int64(txn_manager_status_table_range_partition_span, 1000000,
TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
-DEFINE_uint32(transaction_keep_alive_interval_ms, 3000,
- "Maximum interval (in milliseconds) between subsequent "
- "keep-alive heartbeats from client to TxnManager to let it know "
- "that a transaction is alive");
-TAG_FLAG(transaction_keep_alive_interval_ms, runtime);
-TAG_FLAG(transaction_keep_alive_interval_ms, experimental);
-
namespace kudu {
namespace transactions {
@@ -124,9 +117,9 @@ TxnManager::~TxnManager() {
Status TxnManager::BeginTransaction(const string& username,
const MonoTime& deadline,
int64_t* txn_id,
- int32_t* keep_alive_interval_ms) {
+ uint32_t* txn_keepalive_ms) {
DCHECK(txn_id);
- DCHECK(keep_alive_interval_ms);
+ DCHECK(txn_keepalive_ms);
RETURN_NOT_OK(CheckInitialized(deadline));
// TxnManager uses next_txn_id_ as a hint for next transaction identifier.
@@ -142,11 +135,13 @@ Status TxnManager::BeginTransaction(const string& username,
// the response and passed back to be used here as a hint for
// txn ID on the next request.
int64_t try_txn_id = next_txn_id_++;
+ uint32_t keepalive_ms = 0;
auto s = Status::TimedOut("timed out while trying to find txn_id");
while (MonoTime::Now() < deadline) {
int64_t highest_seen_txn_id = -1;
s = txn_sys_client_->BeginTransaction(
- try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline));
+ try_txn_id, username, &keepalive_ms,
+ &highest_seen_txn_id, ToDelta(deadline));
if (s.ok()) {
DCHECK_GE(highest_seen_txn_id, 0);
// The idea is to make the thread that has gotten a transaction reserved
@@ -197,8 +192,10 @@ Status TxnManager::BeginTransaction(const string& username,
break;
}
if (s.ok()) {
+ DCHECK_GT(try_txn_id, -1);
+ DCHECK_GT(keepalive_ms, 0);
*txn_id = try_txn_id;
- *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms;
+ *txn_keepalive_ms = keepalive_ms;
}
return s;
}
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
index f852799..01bd082 100644
--- a/src/kudu/master/txn_manager.h
+++ b/src/kudu/master/txn_manager.h
@@ -55,7 +55,7 @@ class TxnManager final {
Status BeginTransaction(const std::string& username,
const MonoTime& deadline,
int64_t* txn_id,
- int32_t* keep_alive_interval_ms);
+ uint32_t* txn_keepalive_ms);
// Initiate the commit phase for the transaction. The control is returned
// right after initiating the commit phase: the caller can check for the
diff --git a/src/kudu/master/txn_manager.proto b/src/kudu/master/txn_manager.proto
index cec41a7..03efa52 100644
--- a/src/kudu/master/txn_manager.proto
+++ b/src/kudu/master/txn_manager.proto
@@ -61,7 +61,8 @@ message BeginTransactionResponsePB {
// The keep-alive interval (in milliseconds) to keep the transaction alive.
// TxnManager expects the client to send keep-alive heartbeats spaced by
- // keepalive_millis interval.
+ // keepalive_millis interval or shorter, otherwise the transaction may be
+ // automatically aborted as a stale/abandoned one.
optional uint32 keepalive_millis = 3;
}
diff --git a/src/kudu/master/txn_manager_service.cc b/src/kudu/master/txn_manager_service.cc
index a86b965..de26065 100644
--- a/src/kudu/master/txn_manager_service.cc
+++ b/src/kudu/master/txn_manager_service.cc
@@ -71,15 +71,15 @@ void TxnManagerServiceImpl::BeginTransaction(
BeginTransactionResponsePB* resp,
RpcContext* ctx) {
int64_t txn_id;
- int32_t keep_alive_interval_ms;
+ uint32_t txn_keepalive_ms;
const auto s = server_->txn_manager()->BeginTransaction(
ctx->remote_user().username(),
ctx->GetClientDeadline(),
&txn_id,
- &keep_alive_interval_ms);
+ &txn_keepalive_ms);
if (PREDICT_TRUE(s.ok())) {
resp->set_txn_id(txn_id);
- resp->set_keepalive_millis(keep_alive_interval_ms);
+ resp->set_keepalive_millis(txn_keepalive_ms);
}
CheckRespErrorOrSetUnknown(s, resp);
return ctx->RespondSuccess();
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index e6bacb3..74508a8 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -24,18 +24,27 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/cow_object.h"
+#include "kudu/util/flag_tags.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
+DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
+ "Maximum interval (in milliseconds) between subsequent "
+ "keep-alive heartbeats to let the transaction status manager "
+ "know that a transaction is not abandoned");
+TAG_FLAG(transaction_keepalive_interval_ms, experimental);
+
using kudu::pb_util::SecureShortDebugString;
using kudu::tablet::ParticipantIdsByTxnId;
using kudu::tserver::TabletServerErrorPB;
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 092eb5d..cfefae4 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -125,6 +125,7 @@ Status TxnSystemClient::OpenTxnStatusTable() {
Status TxnSystemClient::BeginTransaction(int64_t txn_id,
const string& user,
+ uint32_t* txn_keepalive_ms,
int64_t* highest_seen_txn_id,
MonoDelta timeout) {
CoordinatorOpPB coordinate_txn_op;
@@ -138,6 +139,15 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
s.AsStatusCallback(),
&result));
const auto ret = s.Wait();
+ if (ret.ok()) {
+ DCHECK(result.has_highest_seen_txn_id());
+ DCHECK(result.has_keepalive_millis());
+ if (txn_keepalive_ms) {
+ *txn_keepalive_ms = result.keepalive_millis();
+ }
+ }
+ // The 'highest_seen_tnx_id' field in the 'result' can be set in case of
+ // some non-OK cases as well.
if (result.has_highest_seen_txn_id() && highest_seen_txn_id) {
*highest_seen_txn_id = result.highest_seen_txn_id();
}
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index c5269b7..f5b3a21 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -84,9 +84,12 @@ class TxnSystemClient {
// parameter (if not null) is set to the highest transaction identifier
// observed by corresponding TxnStatusManager. Otherwise, the
// 'highest_seen_txn_id' parameter is unset (e.g., in case of the requeset
- // to TxnStatusManager timed out).
+ // to TxnStatusManager timed out). The 'keep_alive_ms' output parameter is
+ // populated with number of milliseconds for the transaction's keep-alive
+ // interval in case of success, otherwise it is not set.
Status BeginTransaction(int64_t txn_id, const
std::string& user,
+ uint32_t* txn_keepalive_ms = nullptr,
int64_t* highest_seen_txn_id = nullptr,
MonoDelta timeout = MonoDelta::FromSeconds(10));
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index a73c5a0..11c1f08 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -173,6 +173,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_uint32(transaction_keepalive_interval_ms);
METRIC_DEFINE_counter(
server,
@@ -1283,6 +1284,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
} else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
// Populate corresponding field in the response.
*(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
+ } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+ resp->mutable_op_result()->set_keepalive_millis(
+ FLAGS_transaction_keepalive_interval_ms);
}
if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
DCHECK_GE(highest_seen_txn_id, 0);
diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto
index bd1f881..e95a12a 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -35,6 +35,7 @@ message CoordinatorOpPB {
BEGIN_COMMIT_TXN = 3;
ABORT_TXN = 4;
GET_TXN_STATUS = 5;
+ KEEP_TXN_ALIVE = 6;
}
optional CoordinatorOpType type = 1;
optional int64 txn_id = 2;
@@ -62,6 +63,13 @@ message CoordinatorOpResultPB {
// to a request of the BEGIN_TXN type (in success and error cases).
optional int64 highest_seen_txn_id = 3;
+ // The keep-alive interval for the transaction to begin. This field
+ // is set only if responding to requests of the BEGIN_TXN type.
+ //
+ // TODO(aserbin): it might make sense to populate this field also for
+ // responses of the GET_TXN_STATUS and KEEP_TXN_ALIVE types.
+ optional uint32 keepalive_millis = 4;
+
// TODO(awong): populate this with some application-level results, like the
// actual transaction ID assigned, the next highest transaction ID available,
// etc.