You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/11 06:19:52 UTC
[kudu] branch master updated: KUDU-2612 keep-alive tracking for
transactions
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fa21bf6 KUDU-2612 keep-alive tracking for transactions
fa21bf6 is described below
commit fa21bf61821199aedb4fbcdc1b09d0cf3ee56f51
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Nov 12 22:28:33 2020 -0800
KUDU-2612 keep-alive tracking for transactions
This patch introduces the functionality of tracking the liveness of
a distributed multi-row transaction into TxnStatusManager and
provides corresponding proxy methods in TxnManager, so a Kudu client
now can send keep-alive requests for a transaction (the implementation
of the latter is planned in a follow-up patch).
From the TxnStatusManager, the newly introduced keep-alive RPC is
represented as another type of CoordinateTransaction() request:
CoordinatorOpPB::KEEP_TXN_ALIVE. New tests to cover the existing
functionality are added as well.
More end-to-end tests will be added by follow-up changelist once Kudu
C++ client starts sending keepalive requests for started transactions.
Also, all newly introduced tests in txn_status_manager-itest.cc are
disabled because without [1] they are a bit flaky. I added TODO
to re-enable those once [1] is committed.
[1] https://gerrit.cloudera.org/#/c/16648/
Change-Id: Iae926e02fa7ca597b63ccea90124964c3b6a1175
Reviewed-on: http://gerrit.cloudera.org:8080/16729
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../integration-tests/ts_tablet_manager-itest.cc | 1 +
.../integration-tests/txn_status_manager-itest.cc | 469 +++++++++++++++++++++
.../integration-tests/txn_status_table-itest.cc | 10 +-
src/kudu/master/txn_manager-test.cc | 56 ++-
src/kudu/master/txn_manager.cc | 7 +-
src/kudu/tablet/txn_coordinator.h | 20 +-
src/kudu/transactions/txn_status_entry.cc | 8 +
src/kudu/transactions/txn_status_entry.h | 47 ++-
src/kudu/transactions/txn_status_manager-test.cc | 195 ++++++++-
src/kudu/transactions/txn_status_manager.cc | 134 +++++-
src/kudu/transactions/txn_status_manager.h | 15 +-
src/kudu/transactions/txn_status_tablet.h | 4 +
src/kudu/transactions/txn_system_client.cc | 14 +
src/kudu/transactions/txn_system_client.h | 5 +
src/kudu/tserver/tablet_service.cc | 8 +-
src/kudu/tserver/ts_tablet_manager.cc | 69 ++-
src/kudu/tserver/ts_tablet_manager.h | 18 +
18 files changed, 1023 insertions(+), 58 deletions(-)
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 98d0069..8ff6d03 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -134,6 +134,7 @@ ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
ADD_KUDU_TEST(token_signer-itest)
ADD_KUDU_TEST(txn_participant-itest)
ADD_KUDU_TEST(txn_status_table-itest)
+ADD_KUDU_TEST(txn_status_manager-itest)
ADD_KUDU_TEST(location_assignment-itest
DATA_FILES ../scripts/assign-location.py)
ADD_KUDU_TEST(ts_authz-itest NUM_SHARDS 2)
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index ae1d6e9..2ffefe7 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1233,6 +1233,7 @@ TEST_F(TxnStatusTabletManagementTest, TestTabletServerProxyCalls) {
const vector<CoordinatorOpPB::CoordinatorOpType> kOpSequence = {
CoordinatorOpPB::BEGIN_TXN,
CoordinatorOpPB::REGISTER_PARTICIPANT,
+ CoordinatorOpPB::KEEP_TXN_ALIVE,
CoordinatorOpPB::BEGIN_COMMIT_TXN,
CoordinatorOpPB::ABORT_TXN,
CoordinatorOpPB::GET_TXN_STATUS,
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc b/src/kudu/integration-tests/txn_status_manager-itest.cc
new file mode 100644
index 0000000..00807b1
--- /dev/null
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/master/txn_manager.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::TabletIdAndTableName;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForOpFromCurrentTerm;
+using kudu::rpc::Messenger;
+using kudu::rpc::RpcController;
+using kudu::transactions::CommitTransactionRequestPB;
+using kudu::transactions::CommitTransactionResponsePB;
+using kudu::transactions::BeginTransactionRequestPB;
+using kudu::transactions::BeginTransactionResponsePB;
+using kudu::transactions::GetTransactionStateRequestPB;
+using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
+using kudu::transactions::TxnManagerServiceProxy;
+using kudu::transactions::TxnStatePB;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class TxnStatusManagerITest : public ExternalMiniClusterITestBase {
+ public:
+ static const constexpr auto kNumTabletServers = 3;
+ static const constexpr auto kTxnTrackerIntervalMs = 100;
+ static const constexpr auto kTxnKeepaliveIntervalMs =
+ kTxnTrackerIntervalMs * 5;
+ static const constexpr auto kRaftHbIntervalMs = 50;
+ static const constexpr char* const kTxnTrackerIntervalFlag =
+ "txn_staleness_tracker_interval_ms";
+
+ TxnStatusManagerITest() {
+ cluster_opts_.num_tablet_servers = kNumTabletServers;
+
+ // Enable TxnManager in Kudu masters.
+ // TODO(aserbin): remove this customization once the flag is 'on' by default
+ cluster_opts_.extra_master_flags.emplace_back(
+ "--txn_manager_enabled=true");
+ // Scenarios based on this test fixture assume the txn status table
+ // is created at start, not on first transaction-related operation.
+ cluster_opts_.extra_master_flags.emplace_back(
+ "--txn_manager_lazily_initialized=false");
+
+ // To speed up test scenarios, set shorter intervals for the transaction
+ // keepalive interval ...
+ cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+ "--txn_keepalive_interval_ms=$0", kTxnKeepaliveIntervalMs));
+ // ... and the polling interval of the txn staleness tracker.
+ cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+ "--$0=$1", kTxnTrackerIntervalFlag, kTxnTrackerIntervalMs));
+ cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+ "--txn_staleness_tracker_disabled_interval_ms=$0", kTxnTrackerIntervalMs));
+ // Speed up Raft re-elections in case of non-responsive leader replicas.
+ cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+ "--raft_heartbeat_interval_ms=$0", kRaftHbIntervalMs));
+ cluster_opts_.extra_tserver_flags.emplace_back(
+ "--leader_failure_max_missed_heartbeat_periods=1.25");
+ }
+
+ void SetUp() override {
+ static constexpr const char* const kTxnStatusTableName =
+ "kudu_system.kudu_transactions";
+
+ // This assertion is an explicit statement that all scenarios in this test
+ // assume there is only one TxnManager instance in the cluster.
+ //
+ // TODO(aserbin): make this tests parameterized by the number of TxnManger
+ // instances (i.e. Kudu masters) and extend the scenarios
+ // as needed.
+ ASSERT_EQ(1, cluster_opts_.num_masters);
+
+ NO_FATALS(StartClusterWithOpts(cluster_opts_));
+
+ // Wait for txn status tablets created at each of the tablet servers.
+ vector<TabletIdAndTableName> tablets_info;
+ for (auto idx = 0; idx < kNumTabletServers; ++idx) {
+ vector<TabletIdAndTableName> info;
+ ASSERT_OK(cluster_->WaitForTabletsRunning(
+ cluster_->tablet_server(idx), 1, kTimeout, &info));
+ ASSERT_EQ(1, info.size());
+ tablets_info.emplace_back(std::move(*info.begin()));
+ }
+
+ const string& tablet_id = tablets_info.begin()->tablet_id;
+ for (const auto& elem : tablets_info) {
+ ASSERT_EQ(tablet_id, elem.tablet_id);
+ ASSERT_EQ(kTxnStatusTableName, elem.table_name);
+ }
+ txn_status_tablet_id_ = tablet_id;
+
+ rpc::MessengerBuilder b("txn-keepalive");
+ ASSERT_OK(b.Build(&messenger_));
+ const auto rpc_addr = cluster_->master()->bound_rpc_addr();
+ txn_manager_proxy_.reset(new TxnManagerServiceProxy(
+ messenger_, rpc_addr, rpc_addr.host()));
+ }
+
+ Status GetTxnStatusTabletLeader(string* ts_uuid) {
+ CHECK(ts_uuid);
+ TServerDetails* leader;
+ RETURN_NOT_OK(FindTabletLeader(
+ ts_map_, txn_status_tablet_id_, kTimeout, &leader));
+ *ts_uuid = leader->uuid();
+
+ return Status::OK();
+ }
+
+ Status BeginTransaction(int64_t* txn_id, uint32_t* keepalive_ms) {
+ CHECK(txn_id);
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ BeginTransactionRequestPB req;
+ BeginTransactionResponsePB resp;
+ RETURN_NOT_OK(txn_manager_proxy_->BeginTransaction(req, &resp, &ctx));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ CHECK(resp.has_txn_id());
+ *txn_id = resp.txn_id();
+ CHECK(resp.has_keepalive_millis());
+ *keepalive_ms = resp.keepalive_millis();
+
+ return Status::OK();
+ }
+
+ Status CommitTransaction(int64_t txn_id) {
+ CHECK(txn_id);
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ CommitTransactionRequestPB req;
+ req.set_txn_id(txn_id);
+ CommitTransactionResponsePB resp;
+ RETURN_NOT_OK(txn_manager_proxy_->CommitTransaction(req, &resp, &ctx));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ return Status::OK();
+ }
+
+ Status GetTxnState(int64_t txn_id, TxnStatePB* state) {
+ CHECK(state);
+ CHECK(txn_manager_proxy_);
+ GetTransactionStateRequestPB req;
+ req.set_txn_id(txn_id);
+ GetTransactionStateResponsePB resp;
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ RETURN_NOT_OK(txn_manager_proxy_->GetTransactionState(req, &resp, &ctx));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ CHECK(resp.has_state());
+ *state = resp.state();
+
+ return Status::OK();
+ }
+
+ void CheckTxnState(int64_t txn_id, TxnStatePB expected_state) {
+ TxnStatePB txn_state;
+ ASSERT_OK(GetTxnState(txn_id, &txn_state));
+ ASSERT_EQ(expected_state, txn_state);
+ }
+
+ protected:
+ const string& txn_tablet_id() const {
+ return txn_status_tablet_id_;
+ }
+
+ static void PrepareRpcController(RpcController* ctx) {
+ static const MonoDelta kRpcTimeout = MonoDelta::FromSeconds(15);
+ CHECK_NOTNULL(ctx)->set_timeout(kRpcTimeout);
+ }
+
+ static const MonoDelta kTimeout;
+
+ ExternalMiniClusterOptions cluster_opts_;
+ string txn_status_tablet_id_;
+
+ shared_ptr<rpc::Messenger> messenger_;
+ unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
+};
+
+const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15);
+
+// TODO(aserbin): enable all scenarios below once [1] is committed. Without [1],
+// these scenarios sometimes fails upon calling GetTxnState():
+//
+// Bad status: Not found: Failed to write to server:
+// 7c968757cc19497a93b15b6c6a48e446 (127.13.78.3:33027):
+// transaction ID 0 not found, current highest txn ID: -1
+//
+// The issue here is that a non-leader replica might load
+// the information from tablet that's lagging behind the
+// leader, and once the replica becomes a new leader later on,
+// the information is stale because TxnStatusManager's data
+// isn't yet reloaded upon the becoming a leader. Once the
+// patch above is merged, remove this TODO and remove '#if 0'
+// for the code below.
+//
+// [1] https://gerrit.cloudera.org/#/c/16648/
+
+// The test to verify basic functionality of the transaction tracker: it should
+// detect transactions that haven't received KeepTransactionAlive() requests
+// for longer than the transaction's keepalive interval and automatically abort
+// those.
+TEST_F(TxnStatusManagerITest, DISABLED_StaleTransactionsCleanup) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // Check that the transaction staleness is detected and the stale transaction
+ // is aborted while the transaction is in OPEN state.
+ {
+ int64_t txn_id;
+ uint32_t keepalive_interval_ms;
+ // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+ // and we don't wait for TxnManager to initialize in SetUp().
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+ });
+
+ // Wait for longer than the transaction keep-alive interval to allow
+ // the transaction tracker to detect the staleness of the transaction
+ // and abort it. An extra margin here is to avoid flakiness due to
+ // scheduling anomalies.
+ SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+ }
+
+ // Check that the transaction staleness is detected and the stale transaction
+ // is aborted while the transaction is in COMMIT_IN_PROGRESS state.
+ {
+ int64_t txn_id;
+ uint32_t keepalive_interval_ms;
+ ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+ ASSERT_OK(CommitTransaction(txn_id));
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS));
+
+ // A transaction in COMMIT_IN_PROGRESS state isn't automatically aborted
+ // even if no txn keepalive messages are received.
+ SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS));
+ }
+}
+
+// Make sure it's possible to disable and enable back the transaction
+// staleness tracking in run-time without restarting the processes hosting
+// TxnStatusManager instances (i.e. tablet servers).
+TEST_F(TxnStatusManagerITest, DISABLED_ToggleStaleTxnTrackerInRuntime) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // Disable txn transaction tracking in run-time.
+ for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+ auto* ts = cluster_->tablet_server(i);
+ ASSERT_OK(cluster_->SetFlag(ts, kTxnTrackerIntervalFlag, "0"));
+ }
+
+ int64_t txn_id;
+ uint32_t keepalive_interval_ms;
+ // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+ // and we don't wait for TxnManager to initialize in SetUp().
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+ });
+ ASSERT_EQ(kTxnKeepaliveIntervalMs, keepalive_interval_ms);
+
+ // Now, with no transaction staleness tracking, the transaction should
+ // not be aborted automatically even if not sending keepalive requests.
+ SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+ // Re-enable txn transaction tracking in run-time.
+ for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+ auto* ts = cluster_->tablet_server(i);
+ ASSERT_OK(cluster_->SetFlag(
+ ts, kTxnTrackerIntervalFlag, std::to_string(kTxnTrackerIntervalMs)));
+ }
+
+ // Check that the transaction staleness is detected and the stale transaction
+ // is aborted once stale transaction tracking is re-enabled.
+ SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+}
+
+// Verify the functionality of the stale transaction tracker in TxnStatusManager
+// in case of replicated txn status table. The crux of this scenario is to make
+// sure that a transaction isn't aborted if keepalive requests are sent as
+// required even in case of Raft leader re-elections and restarts
+// of the TxnStatusManager instances.
+TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstances) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ int64_t txn_id;
+ uint32_t keepalive_interval_ms;
+ // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+ // and we don't wait for TxnManager to initialize in SetUp().
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+ });
+
+ CountDownLatch latch(1);
+ Status keep_txn_alive_status;
+ thread txn_keepalive_sender([&] {
+ const auto period = MonoDelta::FromMilliseconds(keepalive_interval_ms / 2);
+ const auto timeout = MonoDelta::FromMilliseconds(keepalive_interval_ms / 4);
+ // Keepalive thread uses its own messenger and proxy.
+ shared_ptr<rpc::Messenger> m;
+ rpc::MessengerBuilder b("txn-keepalive");
+ ASSERT_OK(b.Build(&m));
+ const auto rpc_addr = cluster_->master()->bound_rpc_addr();
+ TxnManagerServiceProxy txn_manager_proxy(m, rpc_addr, rpc_addr.host());
+ do {
+ // The timeout for KeepTransactionAlive() requests should be short,
+ // otherwise this thread might miss sending the requests with proper
+ // timing.
+ RpcController ctx;
+ ctx.set_timeout(timeout);
+ KeepTransactionAliveRequestPB req;
+ req.set_txn_id(txn_id);
+ KeepTransactionAliveResponsePB resp;
+ auto s = txn_manager_proxy.KeepTransactionAlive(req, &resp, &ctx);
+ if (resp.has_error()) {
+ if (resp.error().has_status()) {
+ keep_txn_alive_status = StatusFromPB(resp.error().status());
+ } else {
+ keep_txn_alive_status = Status::RemoteError("unspecified status");
+ }
+ } else {
+ keep_txn_alive_status = s;
+ }
+ if (!keep_txn_alive_status.ok()) {
+ LOG(WARNING) << Substitute(
+ "KeepTransactionAlive() returned non-OK status: $0",
+ keep_txn_alive_status.ToString());
+ }
+ } while (!latch.WaitFor(period));
+ });
+ auto cleanup = MakeScopedCleanup([&] {
+ latch.CountDown();
+ txn_keepalive_sender.join();
+ });
+
+ // Pause tserver processes. This is to check how the stale txn tracker works
+ // in case of 'frozen' and then 'thawed' processes. The essence is to make
+ // sure the former leader doesn't abort a transaction in case of scheduling
+ // anomalies, and TxnManager forwards txn keep-alive messages to proper
+ // TxnStatusMananger instance when leadership changes.
+ for (auto i = 0; i < 5; ++i) {
+ string old_leader_uuid;
+ ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid));
+ auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid);
+ ASSERT_EVENTUALLY([&]{
+ ASSERT_OK(ts->Pause());
+ SleepFor(MonoDelta::FromMilliseconds(
+ std::max(3 * kRaftHbIntervalMs, 2 * kTxnKeepaliveIntervalMs)));
+ ASSERT_OK(ts->Resume());
+ string new_leader_uuid;
+ ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid));
+ ASSERT_NE(old_leader_uuid, new_leader_uuid);
+ auto* other_ts = FindOrDie(ts_map_, new_leader_uuid);
+ // Make sure the new leader has established itself up to the point that
+ // it can write into its backing txn status tablet: this is necessary to
+ // make sure it can abort the transaction, if it finds the transaction
+ // has stalled.
+ ASSERT_OK(WaitForOpFromCurrentTerm(
+ other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout));
+ });
+ // Give the TxnStatusManager instance running with the leader tablet replica
+ // a chance to detect and abort stale transactions, if any detected.
+ SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs));
+ }
+
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+ // Restart tablet servers hosting TxnStatusManager instances. This is to
+ // check that starting TxnStatusManager doesn't abort an open transaction
+ // which sending txn keepalive messages as required.
+ for (auto i = 0; i < 5; ++i) {
+ string old_leader_uuid;
+ ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid));
+ auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid);
+ ts->Shutdown();
+ ASSERT_EVENTUALLY([&]{
+ string new_leader_uuid;
+ ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid));
+ ASSERT_NE(old_leader_uuid, new_leader_uuid);
+ auto* other_ts = FindOrDie(ts_map_, new_leader_uuid);
+ // Make sure the new txn status tablet leader has established itself. For
+ // the reasoning, see the comment in the 'pause scenario' scope above.
+ ASSERT_OK(WaitForOpFromCurrentTerm(
+ other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout));
+ });
+ ASSERT_OK(ts->Restart());
+ SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs));
+ }
+
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+ latch.CountDown();
+ txn_keepalive_sender.join();
+ cleanup.cancel();
+
+ // An extra sanity check: make sure the recent keepalive requests were sent
+ // successfully, as expected.
+ ASSERT_OK(keep_txn_alive_status);
+
+ // Now, when no txn keepalive heartbeats are sent, the transaction
+ // should be automatically aborted by TxnStatusManager running with the
+ // leader replica of the txn status tablet.
+ ASSERT_EVENTUALLY([&]{
+ NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+ });
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+} // namespace kudu
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index aa94827..6cd5b25 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -65,7 +65,7 @@
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
DECLARE_string(superuser_acl);
DECLARE_string(user_acl);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
using kudu::client::AuthenticationCredentialsPB;
using kudu::client::KuduClient;
@@ -351,7 +351,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
uint32_t txn_keepalive_ms;
ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms));
- ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
+ ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, txn_keepalive_ms);
ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
// If we write out of range, we should see an error.
@@ -365,7 +365,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
ASSERT_EQ(-1, highest_seen_txn_id);
// txn_keepalive_ms isn't assigned in case of non-OK status.
ASSERT_EQ(0, txn_keepalive_ms);
- ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms);
+ ASSERT_NE(0, FLAGS_txn_keepalive_interval_ms);
}
{
auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -431,7 +431,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
ASSERT_OK(txn_sys_client_->BeginTransaction(
1, kUser, &txn_keepalive_ms, &highest_seen_txn_id));
ASSERT_EQ(1, highest_seen_txn_id);
- ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
+ ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, txn_keepalive_ms);
// Trying to start another transaction with a used ID should yield an error.
{
@@ -694,8 +694,8 @@ TEST_F(MultiServerTxnStatusTableITest, TestSystemClientLeadershipChange) {
string new_leader_uuid;
ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid));
ASSERT_NE(new_leader_uuid, orig_leader_uuid);
+ ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
});
- ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
}
TEST_F(MultiServerTxnStatusTableITest, TestSystemClientCrashedNodes) {
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index 6d12306..79049f3 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -31,6 +31,7 @@
#include <gtest/gtest.h>
#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
@@ -56,13 +57,14 @@ using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
+using strings::Substitute;
DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_int32(rpc_service_queue_length);
DECLARE_int64(txn_manager_status_table_range_partition_span);
DECLARE_uint32(txn_manager_status_table_num_replicas);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
namespace kudu {
namespace transactions {
@@ -324,7 +326,21 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
txn_id = resp.txn_id();
ASSERT_LE(0, txn_id);
ASSERT_TRUE(resp.has_keepalive_millis());
- ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
+ ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, resp.keepalive_millis());
+ TxnStatePB txn_state;
+ NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+ ASSERT_EQ(TxnStatePB::OPEN, txn_state);
+ }
+
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ KeepTransactionAliveRequestPB req;
+ KeepTransactionAliveResponsePB resp;
+ req.set_txn_id(txn_id);
+ ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
+ ASSERT_FALSE(resp.has_error())
+ << StatusFromPB(resp.error().status()).ToString();
TxnStatePB txn_state;
NO_FATALS(fetch_txn_status(txn_id, &txn_state));
ASSERT_EQ(TxnStatePB::OPEN, txn_state);
@@ -344,9 +360,6 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_state);
}
- // TODO(aserbin): add call to KeepTransactionAlive() when TxnStatusManager
- // has the functionality implemented.
-
{
RpcController ctx;
PrepareRpcController(&ctx);
@@ -360,6 +373,26 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
NO_FATALS(fetch_txn_status(txn_id, &txn_state));
ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
}
+
+ // Try to send keep-alive for already aborted transaction.
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ KeepTransactionAliveRequestPB req;
+ KeepTransactionAliveResponsePB resp;
+ req.set_txn_id(txn_id);
+ ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ s.ToString(),
+ Substitute("transaction ID $0 is already in terminal state", txn_id));
+ // The transaction should stay in ABORTED state, of course.
+ TxnStatePB txn_state;
+ NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+ ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+ }
}
TEST_F(TxnManagerTest, BeginManyTransactions) {
@@ -404,7 +437,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
txn_ids->emplace_back(txn_id);
}
CHECK(resp.has_keepalive_millis());
- CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
+ CHECK_EQ(FLAGS_txn_keepalive_interval_ms, resp.keepalive_millis());
}
};
@@ -496,20 +529,17 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
// (hence there will be multiple TxnManager instances) and verify
// how all this works in case of frequent master re-elections.
-// KeepTransactionAlive is not yet supported.
-// TODO(aserbin): update this scenario once KeepTransactionAlive is implemented
-TEST_F(TxnManagerTest, KeepTransactionAliveRpc) {
+TEST_F(TxnManagerTest, KeepTransactionAliveNonExistingTxnId) {
RpcController ctx;
PrepareRpcController(&ctx);
KeepTransactionAliveRequestPB req;
- req.set_txn_id(0);
+ req.set_txn_id(123);
KeepTransactionAliveResponsePB resp;
ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
ASSERT_TRUE(resp.has_error());
auto s = StatusFromPB(resp.error().status());
- ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_STR_CONTAINS(
- s.ToString(), "Not implemented: KeepTransactionAlive is not supported yet");
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 123 not found");
}
} // namespace transactions
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index 4a016bc..0d4f871 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -230,12 +230,11 @@ Status TxnManager::AbortTransaction(int64_t txn_id,
return txn_sys_client_->AbortTransaction(txn_id, username, ToDelta(deadline));
}
-Status TxnManager::KeepTransactionAlive(int64_t /* txn_id */,
- const string& /* username */,
+Status TxnManager::KeepTransactionAlive(int64_t txn_id,
+ const string& username,
const MonoTime& deadline) {
RETURN_NOT_OK(CheckInitialized(deadline));
- // TODO(aserbin): call txn_sys_client_ once the functionality is there
- return Status::NotSupported("KeepTransactionAlive is not supported yet");
+ return txn_sys_client_->KeepTransactionAlive(txn_id, username, ToDelta(deadline));
}
// This method isn't supposed to be called concurrently, so there isn't any
diff --git a/src/kudu/tablet/txn_coordinator.h b/src/kudu/tablet/txn_coordinator.h
index 8b718ce..39e6ae1 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -86,7 +86,6 @@ class TxnCoordinator {
virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) = 0;
-
// Retrieves the status entry for the specified transaction, returning
// Status::OK() in case of success with 'txn_status' populated. In case of
// error, returns non-OK status. 'ts_error' is used to return not-the-leader
@@ -100,6 +99,13 @@ class TxnCoordinator {
transactions::TxnStatusEntryPB* txn_status,
tserver::TabletServerErrorPB* ts_error) = 0;
+ // Process keep-alive heartbeat for the specified transaction as the given
+ // user. This is to keep the transaction "alive", so the transaction would
+ // not be aborted automatically due to staleness.
+ virtual Status KeepTransactionAlive(int64_t txn_id,
+ const std::string& user,
+ tserver::TabletServerErrorPB* ts_error) = 0;
+
// Registers a participant tablet ID to the given transaction ID as the given
// user.
//
@@ -110,12 +116,18 @@ class TxnCoordinator {
const std::string& user,
tserver::TabletServerErrorPB* ts_error) = 0;
- // Populates a map from transaction ID to the list of participants associated
- // with that transaction ID.
- virtual ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const = 0;
+ // Abort transactions in non-terminal state which are considered 'stale'.
+ // The 'staleness' of a transaction is determined by particular
+ // implementation. This method can be called periodically to get rid of
+ // adandoned transactions.
+ virtual void AbortStaleTransactions() = 0;
// The highest transaction ID seen by this coordinator.
virtual int64_t highest_txn_id() const = 0;
+
+ // Populates a map from transaction ID to the list of participants associated
+ // with that transaction ID.
+ virtual ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const = 0;
};
class TxnCoordinatorFactory {
diff --git a/src/kudu/transactions/txn_status_entry.cc b/src/kudu/transactions/txn_status_entry.cc
index ed56c19..7170a0f 100644
--- a/src/kudu/transactions/txn_status_entry.cc
+++ b/src/kudu/transactions/txn_status_entry.cc
@@ -25,6 +25,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
using std::string;
using std::vector;
@@ -32,6 +33,13 @@ using std::vector;
namespace kudu {
namespace transactions {
+TransactionEntry::TransactionEntry(int64_t txn_id, std::string user)
+ : txn_id_(txn_id),
+ user_(std::move(user)),
+ last_heartbeat_time_(MonoTime::Now()),
+ state_(TxnStatePB::UNKNOWN) {
+}
+
scoped_refptr<ParticipantEntry> TransactionEntry::GetOrCreateParticipant(
const string& tablet_id) {
DCHECK(metadata_.IsReadLocked());
diff --git a/src/kudu/transactions/txn_status_entry.h b/src/kudu/transactions/txn_status_entry.h
index 69c42ad..c96c0f8 100644
--- a/src/kudu/transactions/txn_status_entry.h
+++ b/src/kudu/transactions/txn_status_entry.h
@@ -16,16 +16,20 @@
// under the License.
#pragma once
+#include <atomic>
#include <cstdint>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
+#include <glog/logging.h>
+
#include "kudu/gutil/ref_counted.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/util/cow_object.h"
#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
namespace kudu {
namespace transactions {
@@ -50,13 +54,13 @@ class ParticipantEntry : public RefCountedThreadSafe<ParticipantEntry> {
public:
typedef PersistentParticipantEntry cow_state;
- ParticipantEntry() {}
+ ParticipantEntry() = default;
const CowObject<PersistentParticipantEntry>& metadata() const { return metadata_; }
CowObject<PersistentParticipantEntry>* mutable_metadata() { return &metadata_; }
private:
friend class RefCountedThreadSafe<ParticipantEntry>;
- ~ParticipantEntry() {}
+ ~ParticipantEntry() = default;
// Mutable state for this participant with concurrent access controlled via
// copy-on-write locking.
@@ -72,9 +76,8 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
public:
typedef PersistentTransactionEntry cow_state;
- TransactionEntry(int64_t txn_id, std::string user)
- : txn_id_(txn_id),
- user_(std::move(user)) {}
+ TransactionEntry(int64_t txn_id, std::string user);
+
const CowObject<PersistentTransactionEntry>& metadata() const { return metadata_; }
CowObject<PersistentTransactionEntry>* mutable_metadata() { return &metadata_; }
@@ -89,9 +92,26 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
return user_;
}
+ // Return the last recorded heartbeat timestamp for the transaction.
+ MonoTime last_heartbeat_time() const {
+ return last_heartbeat_time_.load();
+ }
+ // Set the timestamp of last seen keep-alive heartbeat for the transaction.
+ void SetLastHeartbeatTime(const MonoTime& hbtime) {
+ last_heartbeat_time_.store(hbtime);
+ }
+
+ TxnStatePB state() const {
+ return state_.load();
+ }
+ void SetState(TxnStatePB state) {
+ DCHECK(metadata_.IsWriteLocked());
+ state_.store(state);
+ }
+
private:
friend class RefCountedThreadSafe<TransactionEntry>;
- ~TransactionEntry() {}
+ ~TransactionEntry() = default;
const int64_t txn_id_;
@@ -107,6 +127,21 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
// Mutable state for the transaction status record with concurrent access
// controlled via copy-on-write locking.
CowObject<PersistentTransactionEntry> metadata_;
+
+ // Time of the last keep-alive heartbeat received for the transaction
+ // identified by txn_id_. Using std::atomic wrapper since the field can be
+ // accessed concurrently by multiple threads:
+ // * the field can be updated by concurrent KeepAlive requests from
+ // different clients sending writes in the context of the same transaction
+ // * the field is read by the stale transaction tracker in TxnStatusManager
+ std::atomic<MonoTime> last_heartbeat_time_;
+
+ // A shortcut for the state of the transaction stored in the metadata_ field.
+ // Callers should externally synchronize 'state_' with 'metadata_' by calling
+ // SetState() accordingly when TxnStatePB in 'metadata_' changes.
+ //
+ // TODO(aserbin): add a hook into CowObject::Commit() to do so automatically
+ std::atomic<TxnStatePB> state_;
};
typedef MetadataLock<TransactionEntry> TransactionEntryLock;
diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc
index 146024f..960a742 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -32,6 +32,7 @@
#include <utility>
#include <vector>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -67,6 +68,8 @@ using std::unique_ptr;
using std::unordered_set;
using std::vector;
+DECLARE_uint32(txn_keepalive_interval_ms);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
METRIC_DECLARE_entity(tablet);
namespace kudu {
@@ -86,12 +89,22 @@ class TxnStatusManagerTest : public TabletReplicaTestBase {
: TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {}
void SetUp() override {
+ // Using shorter intervals for transaction staleness tracking to speed up
+ // test scenarios verifying related functionality.
+ FLAGS_txn_keepalive_interval_ms = 200;
+ FLAGS_txn_staleness_tracker_interval_ms = 50;
+
NO_FATALS(TabletReplicaTestBase::SetUp());
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+ ASSERT_OK(ResetTxnStatusManager());
+ }
+
+ Status ResetTxnStatusManager() {
txn_manager_.reset(new TxnStatusManager(tablet_replica_.get()));
- ASSERT_OK(txn_manager_->LoadFromTablet());
+ return txn_manager_->LoadFromTablet();
}
+
protected:
unique_ptr<TxnStatusManager> txn_manager_;
};
@@ -162,13 +175,10 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
// Now rebuild the underlying replica and rebuild the TxnStatusManager.
ASSERT_OK(RestartReplica());
- {
- TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
- ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
- ASSERT_EQ(expected_prts_by_txn_id,
- txn_manager_reloaded.GetParticipantsByTxnIdForTests());
- ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id());
- }
+ NO_FATALS(ResetTxnStatusManager());
+ ASSERT_EQ(expected_prts_by_txn_id,
+ txn_manager_->GetParticipantsByTxnIdForTests());
+ ASSERT_EQ(3, txn_manager_->highest_txn_id());
// Verify that TxnStatusManager methods return Status::ServiceUnavailable()
// if the transaction status tablet's data is not loaded yet.
@@ -201,11 +211,15 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
- transactions::TxnStatusEntryPB txn_status;
+ TxnStatusEntryPB txn_status;
s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status, &ts_error);
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
+ s = tsm.KeepTransactionAlive(txn_id, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsServiceUnavailable());
+ ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
+
s = tsm.RegisterParticipant(txn_id, kParticipant1, kOwner, &ts_error);
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
@@ -424,6 +438,16 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
// TxnStatusManager::GetTransactionStatus() method.
TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
{
+ TxnStatusEntryPB txn_status;
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->GetTransactionStatus(
+ 1, kOwner, &txn_status, &ts_error);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
+ ASSERT_FALSE(txn_status.has_state());
+ ASSERT_FALSE(txn_status.has_user());
+ }
+ {
TabletServerErrorPB ts_error;
ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
@@ -475,13 +499,8 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
// Make the TxnStatusManager start from scratch.
- {
- ASSERT_OK(RestartReplica());
- decltype(txn_manager_) txn_manager_reloaded(
- new TxnStatusManager(tablet_replica_.get()));
- ASSERT_OK(txn_manager_reloaded->LoadFromTablet());
- txn_manager_ = std::move(txn_manager_reloaded);
- }
+ ASSERT_OK(RestartReplica());
+ NO_FATALS(ResetTxnStatusManager());
// Committed, aborted, and in-flight transactions should be known to the
// TxnStatusManager even after restarting the underlying replica and
@@ -546,6 +565,150 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
}
}
+// This test scenario verifies basic functionality of the
+// TxnStatusManager::KeepTransactionAlive() method w.r.t. state of the
+// transaction.
+TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
+ // Supplying not-yet-registered transaction ID.
+ {
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
+ }
+
+ // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
+ // Supplying wrong user for transaction in OPEN state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 not owned by stranger");
+ }
+
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+ auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 is in commit phase");
+ // Supplying wrong user for transaction in COMMIT_IN_PROGRESS state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 not owned by stranger");
+ }
+
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+ s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 is already in terminal state");
+ // Supplying wrong user for transaction in COMMITTED state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 not owned by stranger");
+ }
+ }
+
+ // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error));
+
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(2, kOwner, &ts_error));
+ auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 2 is in commit phase");
+
+ ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+ s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 2 is already in terminal state");
+ // Supplying wrong user for transaction in ABORTED state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
+ }
+ }
+
+ // OPEN --> ABORTED
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
+ ASSERT_OK(txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error));
+
+ ASSERT_OK(txn_manager_->AbortTransaction(3, kOwner, &ts_error));
+ auto s = txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 3 is already in terminal state");
+ }
+
+ // Open a new transaction just before restarting the TxnStatusManager.
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+ }
+
+ // Make the TxnStatusManager start from scratch.
+ ASSERT_OK(RestartReplica());
+ NO_FATALS(ResetTxnStatusManager());
+
+ // Committed, aborted, and in-flight transactions should be known to the
+ // TxnStatusManager even after restarting the underlying replica and
+ // rebuilding the TxnStatusManager from scratch, so KeepTransactionAlive()
+ // should behave the same as if no restart has happened.
+ {
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 is already in terminal state");
+ // Supplying wrong user for transaction in COMMITTED state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 1 not owned by stranger");
+ }
+ }
+ {
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 2 is already in terminal state");
+ // Supplying wrong user for transaction in ABORTED state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
+ }
+ }
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
+ // Supplying wrong user for transaction in OPEN state.
+ {
+ auto s = txn_manager_->KeepTransactionAlive(4, "stranger", &ts_error);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "transaction ID 4 not owned by stranger");
+ }
+ }
+}
+
// Test that performing actions as the wrong user will return errors.
TEST_F(TxnStatusManagerTest, TestWrongUser) {
const string kWrongUser = "stranger";
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 2030d88..a7f877a 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <mutex>
+#include <ostream>
#include <string>
#include <utility>
#include <vector>
@@ -40,15 +41,20 @@
#include "kudu/util/cow_object.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
-DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
+DEFINE_uint32(txn_keepalive_interval_ms, 30000,
"Maximum interval (in milliseconds) between subsequent "
"keep-alive heartbeats to let the transaction status manager "
- "know that a transaction is not abandoned");
-TAG_FLAG(transaction_keepalive_interval_ms, experimental);
+ "know that a transaction is not abandoned. If the transaction "
+ "status manager does not receive a keepalive message for a "
+ "longer interval than the specified, the transaction is "
+ "automatically aborted.");
+TAG_FLAG(txn_keepalive_interval_ms, experimental);
+TAG_FLAG(txn_keepalive_interval_ms, runtime);
DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
"Injects a random latency between 0 and this many milliseconds "
@@ -58,6 +64,15 @@ DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, hidden);
TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
+DEFINE_uint32(txn_staleness_tracker_interval_ms, 10000,
+ "Period (in milliseconds) of the task that tracks and aborts "
+ "stale/abandoned transactions. If this flag is set to 0, "
+ "TxnStatusManager doesn't automatically abort stale/abandoned "
+ "transactions even if no keepalive messages are received for "
+ "longer than defined by the --txn_keepalive_interval_ms flag.");
+TAG_FLAG(txn_staleness_tracker_interval_ms, experimental);
+TAG_FLAG(txn_staleness_tracker_interval_ms, runtime);
+
using kudu::pb_util::SecureShortDebugString;
using kudu::tablet::ParticipantIdsByTxnId;
using kudu::tserver::TabletServerErrorPB;
@@ -151,6 +166,7 @@ Status TxnStatusManager::LoadFromTablet() {
std::lock_guard<simple_spinlock> l(lock_);
highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
txns_by_id_ = std::move(txns_by_id);
+
return Status::OK();
}
@@ -270,6 +286,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
txn_lock.mutable_data()->pb.set_state(TxnStatePB::OPEN);
txn_lock.mutable_data()->pb.set_user(user);
+ txn->SetState(txn_lock.data().pb.state());
txn_lock.Commit();
}
std::lock_guard<simple_spinlock> l(lock_);
@@ -302,6 +319,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& us
auto* mutable_data = txn_lock.mutable_data();
mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
+ txn->SetState(txn_lock.data().pb.state());
txn_lock.Commit();
return Status::OK();
}
@@ -328,11 +346,13 @@ Status TxnStatusManager::FinalizeCommitTransaction(
mutable_data->pb.set_state(TxnStatePB::COMMITTED);
RETURN_NOT_OK(status_tablet_.UpdateTransaction(
txn_id, mutable_data->pb, ts_error));
+ txn->SetState(txn_lock.data().pb.state());
txn_lock.Commit();
return Status::OK();
}
-Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& user,
+Status TxnStatusManager::AbortTransaction(int64_t txn_id,
+ const std::string& user,
TabletServerErrorPB* ts_error) {
scoped_refptr<TransactionEntry> txn;
RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
@@ -353,6 +373,7 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& use
auto* mutable_data = txn_lock.mutable_data();
mutable_data->pb.set_state(TxnStatePB::ABORTED);
RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
+ txn->SetState(txn_lock.data().pb.state());
txn_lock.Commit();
return Status::OK();
}
@@ -376,6 +397,42 @@ Status TxnStatusManager::GetTransactionStatus(
return Status::OK();
}
+Status TxnStatusManager::KeepTransactionAlive(int64_t txn_id,
+ const string& user,
+ TabletServerErrorPB* ts_error) {
+ scoped_refptr<TransactionEntry> txn;
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
+
+ // It's a read (not write) lock because the last heartbeat time isn't
+ // persisted into the transaction status tablet. In other words, the last
+ // heartbeat time is a purely run-time piece of information for a
+ // TransactionEntry.
+ TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
+
+ const auto& pb = txn_lock.data().pb;
+ const auto& state = pb.state();
+ if (state != TxnStatePB::OPEN &&
+ state != TxnStatePB::COMMIT_IN_PROGRESS) {
+ return ReportIllegalTxnState(
+ Substitute("transaction ID $0 is already in terminal state: $1",
+ txn_id, SecureShortDebugString(pb)),
+ ts_error);
+ }
+ // Keepalive updates are not required for a transaction in COMMIT_IN_PROGRESS
+ // state. The system takes care of a transaction once the client side
+ // initiates the commit phase.
+ if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
+ return ReportIllegalTxnState(
+ Substitute("transaction ID $0 is in commit phase: $1",
+ txn_id, SecureShortDebugString(pb)),
+ ts_error);
+ }
+ DCHECK_EQ(TxnStatePB::OPEN, state);
+ txn->SetLastHeartbeatTime(MonoTime::Now());
+
+ return Status::OK();
+}
+
Status TxnStatusManager::RegisterParticipant(
int64_t txn_id,
const string& tablet_id,
@@ -418,6 +475,75 @@ Status TxnStatusManager::RegisterParticipant(
return Status::OK();
}
+void TxnStatusManager::AbortStaleTransactions() {
+ const MonoDelta max_staleness_interval =
+ MonoDelta::FromMilliseconds(FLAGS_txn_keepalive_interval_ms);
+
+ auto* consensus = status_tablet_.tablet_replica_->consensus();
+ DCHECK(consensus);
+ if (consensus->role() != RaftPeerPB::LEADER) {
+ // Only leader replicas abort stale transactions registered with them.
+ // As of now, keep-alive requests are sent only to leader replicas, so only
+ // they have up-to-date information about the liveliness of corresponding
+ // transactions.
+ //
+ // If a non-leader replica errorneously (due to a network partition and
+ // the absence of leader leases) tried to abort a transaction, it would fail
+ // because aborting a transaction means writing into the transaction status
+ // tablet, so a non-leader replica's write attempt would be rejected by
+ // the Raft consensus protocol.
+ return;
+ }
+ TransactionsMap txns_by_id;
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ for (const auto& elem : txns_by_id_) {
+ const auto state = elem.second->state();
+ // The tracker is interested only in open transactions. It's not concerned
+ // about transactions in terminal states (i.e. COMMITTED, ABORTED): there
+ // is nothing can be done with those. As for transactions in
+ // COMMIT_IN_PROGRESS state, the system should be take care of those
+ // without any participation from the client side, so txn keepalive
+ // messages are not required while the system tries to finalize those.
+ if (state == TxnStatePB::OPEN) {
+ txns_by_id.emplace(elem.first, elem.second);
+ }
+ }
+ }
+ const MonoTime now = MonoTime::Now();
+ for (auto& elem : txns_by_id) {
+ const auto& txn_id = elem.first;
+ const auto& txn_entry = elem.second;
+ const auto staleness_interval = now - txn_entry->last_heartbeat_time();
+ if (staleness_interval > max_staleness_interval) {
+ TabletServerErrorPB error;
+ auto s = AbortTransaction(txn_id, txn_entry->user(), &error);
+ if (PREDICT_TRUE(s.ok())) {
+ LOG(INFO) << Substitute(
+ "automatically aborted stale txn (ID $0) past $1 from "
+ "last keepalive heartbeat (effective timeout is $2)",
+ txn_id, staleness_interval.ToString(),
+ max_staleness_interval.ToString());
+ } else {
+ LOG(WARNING) << Substitute(
+ "failed to abort stale txn (ID $0) past $1 from "
+ "last keepalive heartbeat (effective timeout is $2): $3",
+ txn_id, staleness_interval.ToString(),
+ max_staleness_interval.ToString(), s.ToString());
+ if (consensus->role() != RaftPeerPB::LEADER ||
+ !status_tablet_.tablet_replica()->CheckRunning().ok()) {
+ // If the replica is no longer a leader at this point, there is
+ // no sense in processing the rest of the entries.
+ LOG(INFO) << "skipping staleness check for the rest of in-flight "
+ "txn records since this txn status tablet replica "
+ "is no longer a leader or not running";
+ return;
+ }
+ }
+ }
+ }
+}
+
ParticipantIdsByTxnId TxnStatusManager::GetParticipantsByTxnIdForTests() const {
ParticipantIdsByTxnId ret;
std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h
index 25ae220..cc4f27e 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -124,6 +124,11 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
transactions::TxnStatusEntryPB* txn_status,
tserver::TabletServerErrorPB* ts_error) override;
+ // Processes keep-alive heartbeat for the specified transaction.
+ Status KeepTransactionAlive(int64_t txn_id,
+ const std::string& user,
+ tserver::TabletServerErrorPB* ts_error) override;
+
// Creates an in-memory participant, writes an entry to the status table, and
// attaches the in-memory participant to the transaction.
//
@@ -133,15 +138,19 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
- // Populates a map from transaction ID to the sorted list of participants
- // associated with that transaction ID.
- tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;
+ // Abort transactions which are still in non-terminal state but haven't
+ // received keep-alive updates (see KeepTransactionAlive()) for a long time.
+ void AbortStaleTransactions() override;
int64_t highest_txn_id() const override {
std::lock_guard<simple_spinlock> l(lock_);
return highest_txn_id_;
}
+ // Populates a map from transaction ID to the sorted list of participants
+ // associated with that transaction ID.
+ tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;
+
private:
// Verifies that the transaction status data has already been loaded from the
// underlying tablet and the replica is a leader. Returns Status::OK() if the
diff --git a/src/kudu/transactions/txn_status_tablet.h b/src/kudu/transactions/txn_status_tablet.h
index d87dd9b..a112d3f 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -117,6 +117,10 @@ class TxnStatusTablet {
const TxnParticipantEntryPB& pb,
tserver::TabletServerErrorPB* ts_error);
+ const tablet::TabletReplica* tablet_replica() const {
+ return tablet_replica_;
+ }
+
private:
friend class TxnStatusManager;
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 2d999e1..30d83d6 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -229,6 +229,20 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
return ret;
}
+Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,
+ const string& user,
+ MonoDelta timeout) {
+ CoordinatorOpPB coordinate_txn_op;
+ coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE);
+ coordinate_txn_op.set_txn_id(txn_id);
+ coordinate_txn_op.set_user(user);
+ Synchronizer s;
+ RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+ timeout,
+ s.AsStatusCallback()));
+ return s.Wait();
+}
+
Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op,
const MonoDelta& timeout,
const StatusCallback& cb,
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index acf1d67..6cb4407 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -120,6 +120,11 @@ class TxnSystemClient {
TxnStatusEntryPB* txn_status,
MonoDelta timeout = MonoDelta::FromSeconds(10));
+ // Send keep-alive heartbeat for the specified transaction as the given user.
+ Status KeepTransactionAlive(int64_t txn_id,
+ const std::string& user,
+ MonoDelta timeout = MonoDelta::FromSeconds(10));
+
// Opens the transaction status table, refreshing metadata with that from the
// masters.
Status OpenTxnStatusTable();
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 12c6326..aa0b905 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -174,7 +174,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
DECLARE_int32(tablet_history_max_age_sec);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
METRIC_DEFINE_counter(
server,
@@ -1201,6 +1201,7 @@ Status ValidateCoordinatorOpFields(const CoordinatorOpPB& op) {
case CoordinatorOpPB::BEGIN_COMMIT_TXN:
case CoordinatorOpPB::ABORT_TXN:
case CoordinatorOpPB::GET_TXN_STATUS:
+ case CoordinatorOpPB::KEEP_TXN_ALIVE:
if (!op.has_txn_id()) {
return Status::InvalidArgument(Substitute("Missing txn id: $0",
SecureShortDebugString(op)));
@@ -1273,6 +1274,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
s = txn_coordinator->GetTransactionStatus(
txn_id, user, &txn_status, &ts_error);
break;
+ case CoordinatorOpPB::KEEP_TXN_ALIVE:
+ s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error);
+ break;
default:
s = Status::InvalidArgument(Substitute("Unknown op type: $0", op.type()));
}
@@ -1289,7 +1293,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
*(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
} else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
resp->mutable_op_result()->set_keepalive_millis(
- FLAGS_transaction_keepalive_interval_ms);
+ FLAGS_txn_keepalive_interval_ms);
}
if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
DCHECK_GE(highest_seen_txn_id, 0);
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3b1127a..11f8a28 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -55,6 +55,7 @@
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_coordinator.h"
#include "kudu/transactions/txn_status_manager.h"
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/tablet_server.h"
@@ -143,7 +144,14 @@ DEFINE_int32(tablet_open_inject_latency_ms, 0,
"Injects latency into the tablet opening. For use in tests only.");
TAG_FLAG(tablet_open_inject_latency_ms, unsafe);
+DEFINE_uint32(txn_staleness_tracker_disabled_interval_ms, 60000,
+ "Polling interval (in milliseconds) for the disabled staleness "
+ "transaction tracker task to check whether it's been re-enabled. "
+ "This is made configurable only for testing purposes.");
+TAG_FLAG(txn_staleness_tracker_disabled_interval_ms, hidden);
+
DECLARE_bool(raft_prepare_replacement_before_eviction);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized,
"Number of Not Initialized Tablets",
@@ -252,6 +260,7 @@ TSTabletManager::TSTabletManager(TabletServer* server)
: fs_manager_(server->fs_manager()),
cmeta_manager_(new ConsensusMetadataManager(fs_manager_)),
server_(server),
+ shutdown_latch_(1),
metric_registry_(server->metric_registry()),
tablet_copy_metrics_(server->metric_entity()),
state_(MANAGER_INITIALIZING) {
@@ -373,6 +382,19 @@ Status TSTabletManager::Init() {
.set_max_threads(max_delete_threads)
.Build(&delete_tablet_pool_));
+ // TODO(aserbin): if better parallelism is needed to serve higher txn volume,
+ // consider using multiple threads in this pool and schedule
+ // per-tablet-replica clean-up tasks via threadpool serial
+ // tokens to make sure no more than one clean-up task
+ // is running against a txn status tablet replica.
+ RETURN_NOT_OK(ThreadPoolBuilder("txn-status-manager")
+ .set_max_threads(1)
+ .set_max_queue_size(0)
+ .Build(&txn_status_manager_pool_));
+ RETURN_NOT_OK(txn_status_manager_pool_->Submit([this]() {
+ this->TxnStalenessTrackerTask();
+ }));
+
// Search for tablets in the metadata dir.
vector<string> tablet_ids;
RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));
@@ -1225,10 +1247,15 @@ void TSTabletManager::Shutdown() {
// Shut down the delete pool, so no new tablets are deleted after this point.
delete_tablet_pool_->Shutdown();
+ // Signal the only task running on the txn_status_manager_pool_ to wrap up.
+ shutdown_latch_.CountDown();
+ // Shut down the pool running the dedicated TxnStatusManager-related task.
+ txn_status_manager_pool_->Shutdown();
+
// Take a snapshot of the replicas list -- that way we don't have to hold
// on to the lock while shutting them down, which might cause a lock
// inversion. (see KUDU-308 for example).
- vector<scoped_refptr<TabletReplica> > replicas_to_shutdown;
+ vector<scoped_refptr<TabletReplica>> replicas_to_shutdown;
GetTabletReplicas(&replicas_to_shutdown);
for (const scoped_refptr<TabletReplica>& replica : replicas_to_shutdown) {
@@ -1347,6 +1374,46 @@ void TSTabletManager::InitLocalRaftPeerPB() {
*local_peer_pb_.mutable_last_known_addr() = HostPortToPB(hp);
}
+void TSTabletManager::TxnStalenessTrackerTask() {
+ while (true) {
+ // This little dance below is to provide functionality of re-enabling the
+ // staleness tracking task at run-time without restarting the process.
+ auto interval_ms = FLAGS_txn_staleness_tracker_interval_ms;
+ bool task_enabled = true;
+ if (interval_ms == 0) {
+ // The task is disabled.
+ interval_ms = FLAGS_txn_staleness_tracker_disabled_interval_ms;
+ task_enabled = false;
+ }
+ // Wait for a notification on shutdown or a timeout expiration.
+ if (shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(interval_ms))) {
+ return;
+ }
+ if (!task_enabled) {
+ continue;
+ }
+
+ vector<scoped_refptr<TabletReplica>> replicas;
+ {
+ shared_lock<RWMutex> l(lock_);
+ for (const auto& elem : tablet_map_) {
+ auto r = elem.second;
+ // Find txn status tablet replicas.
+ if (r->txn_coordinator()) {
+ replicas.emplace_back(std::move(r));
+ }
+ }
+ }
+ for (auto& r : replicas) {
+ if (shutdown_latch_.count() == 0) {
+ return;
+ }
+ auto* coordinator = DCHECK_NOTNULL(r->txn_coordinator());
+ coordinator->AbortStaleTransactions();
+ }
+ }
+}
+
void TSTabletManager::CreateReportedTabletPB(const scoped_refptr<TabletReplica>& replica,
ReportedTabletPB* reported_tablet) const {
reported_tablet->set_tablet_id(replica->tablet_id());
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 5ecc73f..3bacffa 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -38,6 +38,7 @@
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -347,6 +348,10 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// running state.
void InitLocalRaftPeerPB();
+ // A task to check for the staleness of transactions registered with
+ // corresponding transaction status tablets (if any).
+ void TxnStalenessTrackerTask();
+
// Just for tests.
void SetNextUpdateTimeForTests();
@@ -365,6 +370,14 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// tablet_state_counts_, and last_walked_.
mutable RWMutex lock_;
+ // A latch to notify the task running on the txn_status_manager_pool_ on
+ // shutdown.
+ //
+ // TODO(aserbin): instead of using CountDownLatch, extend ConditionVariable
+ // to be able to work with RWMutex and use lock_ from above
+ // to create one to notify the task on shutdown
+ CountDownLatch shutdown_latch_;
+
// Map from tablet ID to tablet
TabletMap tablet_map_;
@@ -399,6 +412,11 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Thread pool used to delete tablets asynchronously.
std::unique_ptr<ThreadPool> delete_tablet_pool_;
+ // Thread pool to run TxnStatusManager tasks. As of now, this pool is
+ // to run a long-running single periodic task to abort stale transactions
+ // registered with corresponding transaction status tablets.
+ std::unique_ptr<ThreadPool> txn_status_manager_pool_;
+
// Ensures that we only update stats from a single thread at a time.
mutable rw_spinlock lock_update_;
MonoTime next_update_time_;