You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/11/18 19:44:11 UTC

[kudu] 02/02: KUDU-2612 move keep-alive assignment to TxnStatusManager

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2d0631fe289968094c9dc6a7b16fcc6df9a269ba
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Nov 16 22:52:04 2020 -0800

    KUDU-2612 move keep-alive assignment to TxnStatusManager
    
    To prepare for the processing of transactions' keep-alive heartbeats
    and tracking of stale/abandoned transactions, the assignment of the
    keep-alive setting for a newly started transaction has moved from
    TxnManager to TxnStatusManager.
    
    The rationale for this change is the fact that TxnStatusManager will
    have a periodic task to track the staleness of open transactions
    registered with it.  At this point we anticipate the keep-alive setting
    to be a system-wide parameter, not a per-transaction one.  With that,
    it's natural to make TxnStatusManager the single source of truth for
    the transaction keep-alive interval, such that the latter to be used
    to assign the corresponding property for newly created transactions
    and to spot stale/abandoned ones.
    
    An alternative would be keeping TxnManager as the source of truth
    for the transaction keep-alive setting, but that would entail storing
    persistently the value assigned by TxnManager in a per-transaction
    basis, which is hard to justify without a particular use case.
    
    This patch also brings in a few test scenarios and updates the relevant
    existing ones.
    
    Change-Id: Ie98688b2bbe4c673abbfc5801f89eb8182003c18
    Reviewed-on: http://gerrit.cloudera.org:8080/16737
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/txn_status_table-itest.cc    | 35 ++++++++++++++++------
 src/kudu/master/txn_manager-test.cc                |  6 ++--
 src/kudu/master/txn_manager.cc                     | 19 +++++-------
 src/kudu/master/txn_manager.h                      |  2 +-
 src/kudu/master/txn_manager.proto                  |  3 +-
 src/kudu/master/txn_manager_service.cc             |  6 ++--
 src/kudu/transactions/txn_status_manager.cc        |  9 ++++++
 src/kudu/transactions/txn_system_client.cc         | 10 +++++++
 src/kudu/transactions/txn_system_client.h          |  5 +++-
 src/kudu/tserver/tablet_service.cc                 |  4 +++
 src/kudu/tserver/tserver_admin.proto               |  8 +++++
 11 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index 37ad5b8..edeb4bb 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -66,6 +66,7 @@
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 using kudu::client::AuthenticationCredentialsPB;
 using kudu::client::KuduClient;
@@ -353,16 +354,23 @@ TEST_F(TxnStatusTableITest, TestTxnStatusTableColocatedWithTables) {
 TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  uint32_t txn_keepalive_ms;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms));
+  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
   ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
 
   // If we write out of range, we should see an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id);
+    uint32_t txn_keepalive_ms = 0;
+    auto s = txn_sys_client_->BeginTransaction(
+        100, kUser, &txn_keepalive_ms, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
     // The 'highest_seen_txn_id' should be left untouched.
     ASSERT_EQ(-1, highest_seen_txn_id);
+    // txn_keepalive_ms isn't assigned in case of non-OK status.
+    ASSERT_EQ(0, txn_keepalive_ms);
+    ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms);
   }
   {
     auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -376,7 +384,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   // Once we add a new range, we should be able to leverage it.
   ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
   int64_t highest_seen_txn_id = -1;
-  ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser, &highest_seen_txn_id));
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      100, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id));
   ASSERT_EQ(100, highest_seen_txn_id);
   ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
   ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
@@ -393,7 +402,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   {
     int64_t highest_seen_txn_id = -1;
     auto s = txn_sys_client_->BeginTransaction(
-        1, kUser, &highest_seen_txn_id, MonoDelta::FromMilliseconds(100));
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+        MonoDelta::FromMilliseconds(100));
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
     // The 'highest_seen_txn_id' should be left untouched.
     ASSERT_EQ(-1, highest_seen_txn_id);
@@ -413,7 +423,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
 
   int64_t highest_seen_txn_id = -1;
   ASSERT_OK(txn_sys_client_->BeginTransaction(
-      1, kUser, &highest_seen_txn_id, MonoDelta::FromSeconds(3)));
+      1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+      MonoDelta::FromSeconds(3)));
   ASSERT_EQ(highest_seen_txn_id, 1);
 }
 
@@ -421,13 +432,17 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
   int64_t highest_seen_txn_id = -1;
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id));
+  uint32_t txn_keepalive_ms;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      1, kUser, &txn_keepalive_ms, &highest_seen_txn_id));
   ASSERT_EQ(1, highest_seen_txn_id);
+  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
 
   // Trying to start another transaction with a used ID should yield an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -436,7 +451,8 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   // The same should be true with a different user.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, "stranger", &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, "stranger", nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
@@ -478,7 +494,8 @@ TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
   // with already used ID should yield an error.
   {
     int64_t highest_seen_txn_id = -1;
-    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index 47d5fce..034ae89 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -61,7 +61,7 @@ DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(rpc_service_queue_length);
 DECLARE_int64(txn_manager_status_table_range_partition_span);
-DECLARE_uint32(transaction_keep_alive_interval_ms);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 namespace kudu {
 namespace transactions {
@@ -320,7 +320,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     txn_id = resp.txn_id();
     ASSERT_LE(0, txn_id);
     ASSERT_TRUE(resp.has_keepalive_millis());
-    ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+    ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
     ASSERT_EQ(TxnStatePB::OPEN, txn_state);
@@ -400,7 +400,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
         txn_ids->emplace_back(txn_id);
       }
       CHECK(resp.has_keepalive_millis());
-      CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms, resp.keepalive_millis());
+      CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
     }
   };
 
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index aee7ce6..f67d942 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -73,13 +73,6 @@ DEFINE_int64(txn_manager_status_table_range_partition_span, 1000000,
 TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
 TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
 
-DEFINE_uint32(transaction_keep_alive_interval_ms, 3000,
-              "Maximum interval (in milliseconds) between subsequent "
-              "keep-alive heartbeats from client to TxnManager to let it know "
-              "that a transaction is alive");
-TAG_FLAG(transaction_keep_alive_interval_ms, runtime);
-TAG_FLAG(transaction_keep_alive_interval_ms, experimental);
-
 namespace kudu {
 namespace transactions {
 
@@ -124,9 +117,9 @@ TxnManager::~TxnManager() {
 Status TxnManager::BeginTransaction(const string& username,
                                     const MonoTime& deadline,
                                     int64_t* txn_id,
-                                    int32_t* keep_alive_interval_ms) {
+                                    uint32_t* txn_keepalive_ms) {
   DCHECK(txn_id);
-  DCHECK(keep_alive_interval_ms);
+  DCHECK(txn_keepalive_ms);
   RETURN_NOT_OK(CheckInitialized(deadline));
 
   // TxnManager uses next_txn_id_ as a hint for next transaction identifier.
@@ -142,11 +135,13 @@ Status TxnManager::BeginTransaction(const string& username,
   //                the response and passed back to be used here as a hint for
   //                txn ID on the next request.
   int64_t try_txn_id = next_txn_id_++;
+  uint32_t keepalive_ms = 0;
   auto s = Status::TimedOut("timed out while trying to find txn_id");
   while (MonoTime::Now() < deadline) {
     int64_t highest_seen_txn_id = -1;
     s = txn_sys_client_->BeginTransaction(
-        try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline));
+        try_txn_id, username, &keepalive_ms,
+        &highest_seen_txn_id, ToDelta(deadline));
     if (s.ok()) {
       DCHECK_GE(highest_seen_txn_id, 0);
       // The idea is to make the thread that has gotten a transaction reserved
@@ -197,8 +192,10 @@ Status TxnManager::BeginTransaction(const string& username,
     break;
   }
   if (s.ok()) {
+    DCHECK_GT(try_txn_id, -1);
+    DCHECK_GT(keepalive_ms, 0);
     *txn_id = try_txn_id;
-    *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms;
+    *txn_keepalive_ms = keepalive_ms;
   }
   return s;
 }
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
index f852799..01bd082 100644
--- a/src/kudu/master/txn_manager.h
+++ b/src/kudu/master/txn_manager.h
@@ -55,7 +55,7 @@ class TxnManager final {
   Status BeginTransaction(const std::string& username,
                           const MonoTime& deadline,
                           int64_t* txn_id,
-                          int32_t* keep_alive_interval_ms);
+                          uint32_t* txn_keepalive_ms);
 
   // Initiate the commit phase for the transaction. The control is returned
   // right after initiating the commit phase: the caller can check for the
diff --git a/src/kudu/master/txn_manager.proto b/src/kudu/master/txn_manager.proto
index cec41a7..03efa52 100644
--- a/src/kudu/master/txn_manager.proto
+++ b/src/kudu/master/txn_manager.proto
@@ -61,7 +61,8 @@ message BeginTransactionResponsePB {
 
   // The keep-alive interval (in milliseconds) to keep the transaction alive.
   // TxnManager expects the client to send keep-alive heartbeats spaced by
-  // keepalive_millis interval.
+  // keepalive_millis interval or shorter, otherwise the transaction may be
+  // automatically aborted as a stale/abandoned one.
   optional uint32 keepalive_millis = 3;
 }
 
diff --git a/src/kudu/master/txn_manager_service.cc b/src/kudu/master/txn_manager_service.cc
index a86b965..de26065 100644
--- a/src/kudu/master/txn_manager_service.cc
+++ b/src/kudu/master/txn_manager_service.cc
@@ -71,15 +71,15 @@ void TxnManagerServiceImpl::BeginTransaction(
     BeginTransactionResponsePB* resp,
     RpcContext* ctx) {
   int64_t txn_id;
-  int32_t keep_alive_interval_ms;
+  uint32_t txn_keepalive_ms;
   const auto s = server_->txn_manager()->BeginTransaction(
       ctx->remote_user().username(),
       ctx->GetClientDeadline(),
       &txn_id,
-      &keep_alive_interval_ms);
+      &txn_keepalive_ms);
   if (PREDICT_TRUE(s.ok())) {
     resp->set_txn_id(txn_id);
-    resp->set_keepalive_millis(keep_alive_interval_ms);
+    resp->set_keepalive_millis(txn_keepalive_ms);
   }
   CheckRespErrorOrSetUnknown(s, resp);
   return ctx->RespondSuccess();
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index e6bacb3..74508a8 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -24,18 +24,27 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 
+DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
+              "Maximum interval (in milliseconds) between subsequent "
+              "keep-alive heartbeats to let the transaction status manager "
+              "know that a transaction is not abandoned");
+TAG_FLAG(transaction_keepalive_interval_ms, experimental);
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 092eb5d..cfefae4 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -125,6 +125,7 @@ Status TxnSystemClient::OpenTxnStatusTable() {
 
 Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                          const string& user,
+                                         uint32_t* txn_keepalive_ms,
                                          int64_t* highest_seen_txn_id,
                                          MonoDelta timeout) {
   CoordinatorOpPB coordinate_txn_op;
@@ -138,6 +139,15 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                            s.AsStatusCallback(),
                                            &result));
   const auto ret = s.Wait();
+  if (ret.ok()) {
+    DCHECK(result.has_highest_seen_txn_id());
+    DCHECK(result.has_keepalive_millis());
+    if (txn_keepalive_ms) {
+      *txn_keepalive_ms = result.keepalive_millis();
+    }
+  }
+  // The 'highest_seen_tnx_id' field in the 'result' can be set in case of
+  // some non-OK cases as well.
   if (result.has_highest_seen_txn_id() && highest_seen_txn_id) {
     *highest_seen_txn_id = result.highest_seen_txn_id();
   }
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index c5269b7..f5b3a21 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -84,9 +84,12 @@ class TxnSystemClient {
   // parameter (if not null) is set to the highest transaction identifier
   // observed by corresponding TxnStatusManager. Otherwise, the
   // 'highest_seen_txn_id' parameter is unset (e.g., in case of the requeset
-  // to TxnStatusManager timed out).
+  // to TxnStatusManager timed out). The 'keep_alive_ms' output parameter is
+  // populated with number of milliseconds for the transaction's keep-alive
+  // interval in case of success, otherwise it is not set.
   Status BeginTransaction(int64_t txn_id, const
                           std::string& user,
+                          uint32_t* txn_keepalive_ms = nullptr,
                           int64_t* highest_seen_txn_id = nullptr,
                           MonoDelta timeout = MonoDelta::FromSeconds(10));
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index a73c5a0..11c1f08 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -173,6 +173,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_uint32(transaction_keepalive_interval_ms);
 
 METRIC_DEFINE_counter(
     server,
@@ -1283,6 +1284,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
   } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
     // Populate corresponding field in the response.
     *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
+  } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+    resp->mutable_op_result()->set_keepalive_millis(
+        FLAGS_transaction_keepalive_interval_ms);
   }
   if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
     DCHECK_GE(highest_seen_txn_id, 0);
diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto
index bd1f881..e95a12a 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -35,6 +35,7 @@ message CoordinatorOpPB {
     BEGIN_COMMIT_TXN = 3;
     ABORT_TXN = 4;
     GET_TXN_STATUS = 5;
+    KEEP_TXN_ALIVE = 6;
   }
   optional CoordinatorOpType type = 1;
   optional int64 txn_id = 2;
@@ -62,6 +63,13 @@ message CoordinatorOpResultPB {
   // to a request of the BEGIN_TXN type (in success and error cases).
   optional int64 highest_seen_txn_id = 3;
 
+  // The keep-alive interval for the transaction to begin. This field
+  // is set only if responding to requests of the BEGIN_TXN type.
+  //
+  // TODO(aserbin): it might make sense to populate this field also for
+  //                responses of the GET_TXN_STATUS and KEEP_TXN_ALIVE types.
+  optional uint32 keepalive_millis = 4;
+
   // TODO(awong): populate this with some application-level results, like the
   // actual transaction ID assigned, the next highest transaction ID available,
   // etc.