You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/12/11 06:19:52 UTC

[kudu] branch master updated: KUDU-2612 keep-alive tracking for transactions

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fa21bf6  KUDU-2612 keep-alive tracking for transactions
fa21bf6 is described below

commit fa21bf61821199aedb4fbcdc1b09d0cf3ee56f51
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Nov 12 22:28:33 2020 -0800

    KUDU-2612 keep-alive tracking for transactions
    
    This patch introduces the functionality of tracking the liveness of
    a distributed multi-row transaction into TxnStatusManager and
    provides corresponding proxy methods in TxnManager, so a Kudu client
    now can send keep-alive requests for a transaction (the implementation
    of the latter is planned in a follow-up patch).
    
    From the TxnStatusManager, the newly introduced keep-alive RPC is
    represented as another type of CoordinateTransaction() request:
    CoordinatorOpPB::KEEP_TXN_ALIVE.  New tests to cover the existing
    functionality are added as well.
    
    More end-to-end tests will be added by follow-up changelist once Kudu
    C++ client starts sending keepalive requests for started transactions.
    
    Also, all newly introduced tests in txn_status_manager-itest.cc are
    disabled because without [1] they are a bit flaky.  I added TODO
    to re-enable those once [1] is committed.
    
    [1] https://gerrit.cloudera.org/#/c/16648/
    
    Change-Id: Iae926e02fa7ca597b63ccea90124964c3b6a1175
    Reviewed-on: http://gerrit.cloudera.org:8080/16729
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 .../integration-tests/ts_tablet_manager-itest.cc   |   1 +
 .../integration-tests/txn_status_manager-itest.cc  | 469 +++++++++++++++++++++
 .../integration-tests/txn_status_table-itest.cc    |  10 +-
 src/kudu/master/txn_manager-test.cc                |  56 ++-
 src/kudu/master/txn_manager.cc                     |   7 +-
 src/kudu/tablet/txn_coordinator.h                  |  20 +-
 src/kudu/transactions/txn_status_entry.cc          |   8 +
 src/kudu/transactions/txn_status_entry.h           |  47 ++-
 src/kudu/transactions/txn_status_manager-test.cc   | 195 ++++++++-
 src/kudu/transactions/txn_status_manager.cc        | 134 +++++-
 src/kudu/transactions/txn_status_manager.h         |  15 +-
 src/kudu/transactions/txn_status_tablet.h          |   4 +
 src/kudu/transactions/txn_system_client.cc         |  14 +
 src/kudu/transactions/txn_system_client.h          |   5 +
 src/kudu/tserver/tablet_service.cc                 |   8 +-
 src/kudu/tserver/ts_tablet_manager.cc              |  69 ++-
 src/kudu/tserver/ts_tablet_manager.h               |  18 +
 18 files changed, 1023 insertions(+), 58 deletions(-)

diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 98d0069..8ff6d03 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -134,6 +134,7 @@ ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(token_signer-itest)
 ADD_KUDU_TEST(txn_participant-itest)
 ADD_KUDU_TEST(txn_status_table-itest)
+ADD_KUDU_TEST(txn_status_manager-itest)
 ADD_KUDU_TEST(location_assignment-itest
   DATA_FILES ../scripts/assign-location.py)
 ADD_KUDU_TEST(ts_authz-itest NUM_SHARDS 2)
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index ae1d6e9..2ffefe7 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1233,6 +1233,7 @@ TEST_F(TxnStatusTabletManagementTest, TestTabletServerProxyCalls) {
   const vector<CoordinatorOpPB::CoordinatorOpType> kOpSequence = {
     CoordinatorOpPB::BEGIN_TXN,
     CoordinatorOpPB::REGISTER_PARTICIPANT,
+    CoordinatorOpPB::KEEP_TXN_ALIVE,
     CoordinatorOpPB::BEGIN_COMMIT_TXN,
     CoordinatorOpPB::ABORT_TXN,
     CoordinatorOpPB::GET_TXN_STATUS,
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc b/src/kudu/integration-tests/txn_status_manager-itest.cc
new file mode 100644
index 0000000..00807b1
--- /dev/null
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/master/txn_manager.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::TabletIdAndTableName;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForOpFromCurrentTerm;
+using kudu::rpc::Messenger;
+using kudu::rpc::RpcController;
+using kudu::transactions::CommitTransactionRequestPB;
+using kudu::transactions::CommitTransactionResponsePB;
+using kudu::transactions::BeginTransactionRequestPB;
+using kudu::transactions::BeginTransactionResponsePB;
+using kudu::transactions::GetTransactionStateRequestPB;
+using kudu::transactions::GetTransactionStateResponsePB;
+using kudu::transactions::KeepTransactionAliveRequestPB;
+using kudu::transactions::KeepTransactionAliveResponsePB;
+using kudu::transactions::TxnManagerServiceProxy;
+using kudu::transactions::TxnStatePB;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class TxnStatusManagerITest : public ExternalMiniClusterITestBase {
+ public:
+  static const constexpr auto kNumTabletServers = 3;
+  static const constexpr auto kTxnTrackerIntervalMs = 100;
+  static const constexpr auto kTxnKeepaliveIntervalMs =
+      kTxnTrackerIntervalMs * 5;
+  static const constexpr auto kRaftHbIntervalMs = 50;
+  static const constexpr char* const kTxnTrackerIntervalFlag =
+      "txn_staleness_tracker_interval_ms";
+
+  TxnStatusManagerITest() {
+    cluster_opts_.num_tablet_servers = kNumTabletServers;
+
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    cluster_opts_.extra_master_flags.emplace_back(
+        "--txn_manager_enabled=true");
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    cluster_opts_.extra_master_flags.emplace_back(
+        "--txn_manager_lazily_initialized=false");
+
+    // To speed up test scenarios, set shorter intervals for the transaction
+    // keepalive interval ...
+    cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+        "--txn_keepalive_interval_ms=$0", kTxnKeepaliveIntervalMs));
+    // ... and the polling interval of the txn staleness tracker.
+    cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+        "--$0=$1", kTxnTrackerIntervalFlag, kTxnTrackerIntervalMs));
+    cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+        "--txn_staleness_tracker_disabled_interval_ms=$0", kTxnTrackerIntervalMs));
+    // Speed up Raft re-elections in case of non-responsive leader replicas.
+    cluster_opts_.extra_tserver_flags.emplace_back(Substitute(
+        "--raft_heartbeat_interval_ms=$0", kRaftHbIntervalMs));
+    cluster_opts_.extra_tserver_flags.emplace_back(
+        "--leader_failure_max_missed_heartbeat_periods=1.25");
+  }
+
+  void SetUp() override {
+    static constexpr const char* const kTxnStatusTableName =
+        "kudu_system.kudu_transactions";
+
+    // This assertion is an explicit statement that all scenarios in this test
+    // assume there is only one TxnManager instance in the cluster.
+    //
+    // TODO(aserbin): make this tests parameterized by the number of TxnManger
+    //                instances (i.e. Kudu masters) and extend the scenarios
+    //                as needed.
+    ASSERT_EQ(1, cluster_opts_.num_masters);
+
+    NO_FATALS(StartClusterWithOpts(cluster_opts_));
+
+    // Wait for txn status tablets created at each of the tablet servers.
+    vector<TabletIdAndTableName> tablets_info;
+    for (auto idx = 0; idx < kNumTabletServers; ++idx) {
+      vector<TabletIdAndTableName> info;
+      ASSERT_OK(cluster_->WaitForTabletsRunning(
+          cluster_->tablet_server(idx), 1, kTimeout, &info));
+      ASSERT_EQ(1, info.size());
+      tablets_info.emplace_back(std::move(*info.begin()));
+    }
+
+    const string& tablet_id = tablets_info.begin()->tablet_id;
+    for (const auto& elem : tablets_info) {
+      ASSERT_EQ(tablet_id, elem.tablet_id);
+      ASSERT_EQ(kTxnStatusTableName, elem.table_name);
+    }
+    txn_status_tablet_id_ = tablet_id;
+
+    rpc::MessengerBuilder b("txn-keepalive");
+    ASSERT_OK(b.Build(&messenger_));
+    const auto rpc_addr = cluster_->master()->bound_rpc_addr();
+    txn_manager_proxy_.reset(new TxnManagerServiceProxy(
+        messenger_, rpc_addr, rpc_addr.host()));
+  }
+
+  Status GetTxnStatusTabletLeader(string* ts_uuid) {
+    CHECK(ts_uuid);
+    TServerDetails* leader;
+    RETURN_NOT_OK(FindTabletLeader(
+        ts_map_, txn_status_tablet_id_, kTimeout, &leader));
+    *ts_uuid = leader->uuid();
+
+    return Status::OK();
+  }
+
+  Status BeginTransaction(int64_t* txn_id, uint32_t* keepalive_ms) {
+    CHECK(txn_id);
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    BeginTransactionRequestPB req;
+    BeginTransactionResponsePB resp;
+    RETURN_NOT_OK(txn_manager_proxy_->BeginTransaction(req, &resp, &ctx));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    CHECK(resp.has_txn_id());
+    *txn_id = resp.txn_id();
+    CHECK(resp.has_keepalive_millis());
+    *keepalive_ms = resp.keepalive_millis();
+
+    return Status::OK();
+  }
+
+  Status CommitTransaction(int64_t txn_id) {
+    CHECK(txn_id);
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    CommitTransactionRequestPB req;
+    req.set_txn_id(txn_id);
+    CommitTransactionResponsePB resp;
+    RETURN_NOT_OK(txn_manager_proxy_->CommitTransaction(req, &resp, &ctx));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    return Status::OK();
+  }
+
+  Status GetTxnState(int64_t txn_id, TxnStatePB* state) {
+    CHECK(state);
+    CHECK(txn_manager_proxy_);
+    GetTransactionStateRequestPB req;
+    req.set_txn_id(txn_id);
+    GetTransactionStateResponsePB resp;
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    RETURN_NOT_OK(txn_manager_proxy_->GetTransactionState(req, &resp, &ctx));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+    CHECK(resp.has_state());
+    *state = resp.state();
+
+    return Status::OK();
+  }
+
+  void CheckTxnState(int64_t txn_id, TxnStatePB expected_state) {
+    TxnStatePB txn_state;
+    ASSERT_OK(GetTxnState(txn_id, &txn_state));
+    ASSERT_EQ(expected_state, txn_state);
+  }
+
+ protected:
+  const string& txn_tablet_id() const {
+    return txn_status_tablet_id_;
+  }
+
+  static void PrepareRpcController(RpcController* ctx) {
+    static const MonoDelta kRpcTimeout = MonoDelta::FromSeconds(15);
+    CHECK_NOTNULL(ctx)->set_timeout(kRpcTimeout);
+  }
+
+  static const MonoDelta kTimeout;
+
+  ExternalMiniClusterOptions cluster_opts_;
+  string txn_status_tablet_id_;
+
+  shared_ptr<rpc::Messenger> messenger_;
+  unique_ptr<TxnManagerServiceProxy> txn_manager_proxy_;
+};
+
+const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15);
+
+// TODO(aserbin): enable all scenarios below once [1] is committed. Without [1],
+//                these scenarios sometimes fails upon calling GetTxnState():
+//
+//   Bad status: Not found: Failed to write to server:
+//   7c968757cc19497a93b15b6c6a48e446 (127.13.78.3:33027):
+//   transaction ID 0 not found, current highest txn ID: -1
+//
+//                The issue here is that a non-leader replica might load
+//                the information from tablet that's lagging behind the
+//                leader, and once the replica becomes a new leader later on,
+//                the information is stale because TxnStatusManager's data
+//                isn't yet reloaded upon the becoming a leader. Once the
+//                patch above is merged, remove this TODO and remove '#if 0'
+//                for the code below.
+//
+//                [1] https://gerrit.cloudera.org/#/c/16648/
+
+// The test to verify basic functionality of the transaction tracker: it should
+// detect transactions that haven't received KeepTransactionAlive() requests
+// for longer than the transaction's keepalive interval and automatically abort
+// those.
+TEST_F(TxnStatusManagerITest, DISABLED_StaleTransactionsCleanup) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Check that the transaction staleness is detected and the stale transaction
+  // is aborted while the transaction is in OPEN state.
+  {
+    int64_t txn_id;
+    uint32_t keepalive_interval_ms;
+    // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+    // and we don't wait for TxnManager to initialize in SetUp().
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+    });
+
+    // Wait for longer than the transaction keep-alive interval to allow
+    // the transaction tracker to detect the staleness of the transaction
+    // and abort it. An extra margin here is to avoid flakiness due to
+    // scheduling anomalies.
+    SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+  }
+
+  // Check that the transaction staleness is detected and the stale transaction
+  // is aborted while the transaction is in COMMIT_IN_PROGRESS state.
+  {
+    int64_t txn_id;
+    uint32_t keepalive_interval_ms;
+    ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+    ASSERT_OK(CommitTransaction(txn_id));
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS));
+
+    // A transaction in COMMIT_IN_PROGRESS state isn't automatically aborted
+    // even if no txn keepalive messages are received.
+    SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::COMMIT_IN_PROGRESS));
+  }
+}
+
+// Make sure it's possible to disable and enable back the transaction
+// staleness tracking in run-time without restarting the processes hosting
+// TxnStatusManager instances (i.e. tablet servers).
+TEST_F(TxnStatusManagerITest, DISABLED_ToggleStaleTxnTrackerInRuntime) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Disable txn transaction tracking in run-time.
+  for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    auto* ts = cluster_->tablet_server(i);
+    ASSERT_OK(cluster_->SetFlag(ts, kTxnTrackerIntervalFlag, "0"));
+  }
+
+  int64_t txn_id;
+  uint32_t keepalive_interval_ms;
+  // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+  // and we don't wait for TxnManager to initialize in SetUp().
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+  });
+  ASSERT_EQ(kTxnKeepaliveIntervalMs, keepalive_interval_ms);
+
+  // Now, with no transaction staleness tracking, the transaction should
+  // not be aborted automatically even if not sending keepalive requests.
+  SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+  // Re-enable txn transaction tracking in run-time.
+  for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    auto* ts = cluster_->tablet_server(i);
+    ASSERT_OK(cluster_->SetFlag(
+        ts, kTxnTrackerIntervalFlag, std::to_string(kTxnTrackerIntervalMs)));
+  }
+
+  // Check that the transaction staleness is detected and the stale transaction
+  // is aborted once stale transaction tracking is re-enabled.
+  SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
+  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+}
+
+// Verify the functionality of the stale transaction tracker in TxnStatusManager
+// in case of replicated txn status table. The crux of this scenario is to make
+// sure that a transaction isn't aborted if keepalive requests are sent as
+// required even in case of Raft leader re-elections and restarts
+// of the TxnStatusManager instances.
+TEST_F(TxnStatusManagerITest, DISABLED_TxnKeepAliveMultiTxnStatusManagerInstances) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  int64_t txn_id;
+  uint32_t keepalive_interval_ms;
+  // ASSERT_EVENTUALLY is needed because this test uses raw TxnManagerProxy,
+  // and we don't wait for TxnManager to initialize in SetUp().
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_OK(BeginTransaction(&txn_id, &keepalive_interval_ms));
+  });
+
+  CountDownLatch latch(1);
+  Status keep_txn_alive_status;
+  thread txn_keepalive_sender([&] {
+    const auto period = MonoDelta::FromMilliseconds(keepalive_interval_ms / 2);
+    const auto timeout = MonoDelta::FromMilliseconds(keepalive_interval_ms / 4);
+    // Keepalive thread uses its own messenger and proxy.
+    shared_ptr<rpc::Messenger> m;
+    rpc::MessengerBuilder b("txn-keepalive");
+    ASSERT_OK(b.Build(&m));
+    const auto rpc_addr = cluster_->master()->bound_rpc_addr();
+    TxnManagerServiceProxy txn_manager_proxy(m, rpc_addr, rpc_addr.host());
+    do {
+      // The timeout for KeepTransactionAlive() requests should be short,
+      // otherwise this thread might miss sending the requests with proper
+      // timing.
+      RpcController ctx;
+      ctx.set_timeout(timeout);
+      KeepTransactionAliveRequestPB req;
+      req.set_txn_id(txn_id);
+      KeepTransactionAliveResponsePB resp;
+      auto s = txn_manager_proxy.KeepTransactionAlive(req, &resp, &ctx);
+      if (resp.has_error()) {
+        if (resp.error().has_status()) {
+          keep_txn_alive_status = StatusFromPB(resp.error().status());
+        } else {
+          keep_txn_alive_status = Status::RemoteError("unspecified status");
+        }
+      } else {
+        keep_txn_alive_status = s;
+      }
+      if (!keep_txn_alive_status.ok()) {
+        LOG(WARNING) << Substitute(
+            "KeepTransactionAlive() returned non-OK status: $0",
+            keep_txn_alive_status.ToString());
+      }
+    } while (!latch.WaitFor(period));
+  });
+  auto cleanup = MakeScopedCleanup([&] {
+    latch.CountDown();
+    txn_keepalive_sender.join();
+  });
+
+  // Pause tserver processes. This is to check how the stale txn tracker works
+  // in case of 'frozen' and then 'thawed' processes. The essence is to make
+  // sure the former leader doesn't abort a transaction in case of scheduling
+  // anomalies, and TxnManager forwards txn keep-alive messages to proper
+  // TxnStatusMananger instance when leadership changes.
+  for (auto i = 0; i < 5; ++i) {
+    string old_leader_uuid;
+    ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid));
+    auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid);
+    ASSERT_EVENTUALLY([&]{
+      ASSERT_OK(ts->Pause());
+      SleepFor(MonoDelta::FromMilliseconds(
+          std::max(3 * kRaftHbIntervalMs, 2 * kTxnKeepaliveIntervalMs)));
+      ASSERT_OK(ts->Resume());
+      string new_leader_uuid;
+      ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid));
+      ASSERT_NE(old_leader_uuid, new_leader_uuid);
+      auto* other_ts = FindOrDie(ts_map_, new_leader_uuid);
+      // Make sure the new leader has established itself up to the point that
+      // it can write into its backing txn status tablet: this is necessary to
+      // make sure it can abort the transaction, if it finds the transaction
+      // has stalled.
+      ASSERT_OK(WaitForOpFromCurrentTerm(
+          other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout));
+    });
+    // Give the TxnStatusManager instance running with the leader tablet replica
+    // a chance to detect and abort stale transactions, if any detected.
+    SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs));
+  }
+
+  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+  // Restart tablet servers hosting TxnStatusManager instances. This is to
+  // check that starting TxnStatusManager doesn't abort an open transaction
+  // which sending txn keepalive messages as required.
+  for (auto i = 0; i < 5; ++i) {
+    string old_leader_uuid;
+    ASSERT_OK(GetTxnStatusTabletLeader(&old_leader_uuid));
+    auto* ts = cluster_->tablet_server_by_uuid(old_leader_uuid);
+    ts->Shutdown();
+    ASSERT_EVENTUALLY([&]{
+      string new_leader_uuid;
+      ASSERT_OK(GetTxnStatusTabletLeader(&new_leader_uuid));
+      ASSERT_NE(old_leader_uuid, new_leader_uuid);
+      auto* other_ts = FindOrDie(ts_map_, new_leader_uuid);
+      // Make sure the new txn status tablet leader has established itself. For
+      // the reasoning, see the comment in the 'pause scenario' scope above.
+      ASSERT_OK(WaitForOpFromCurrentTerm(
+          other_ts, txn_tablet_id(), consensus::COMMITTED_OPID, kTimeout));
+    });
+    ASSERT_OK(ts->Restart());
+    SleepFor(MonoDelta::FromMilliseconds(2 * kTxnKeepaliveIntervalMs));
+  }
+
+  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
+
+  latch.CountDown();
+  txn_keepalive_sender.join();
+  cleanup.cancel();
+
+  // An extra sanity check: make sure the recent keepalive requests were sent
+  // successfully, as expected.
+  ASSERT_OK(keep_txn_alive_status);
+
+  // Now, when no txn keepalive heartbeats are sent, the transaction
+  // should be automatically aborted by TxnStatusManager running with the
+  // leader replica of the txn status tablet.
+  ASSERT_EVENTUALLY([&]{
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+  });
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+} // namespace kudu
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index aa94827..6cd5b25 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -65,7 +65,7 @@
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
 
 using kudu::client::AuthenticationCredentialsPB;
 using kudu::client::KuduClient;
@@ -351,7 +351,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
   uint32_t txn_keepalive_ms;
   ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &txn_keepalive_ms));
-  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
+  ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, txn_keepalive_ms);
   ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
 
   // If we write out of range, we should see an error.
@@ -365,7 +365,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
     ASSERT_EQ(-1, highest_seen_txn_id);
     // txn_keepalive_ms isn't assigned in case of non-OK status.
     ASSERT_EQ(0, txn_keepalive_ms);
-    ASSERT_NE(0, FLAGS_transaction_keepalive_interval_ms);
+    ASSERT_NE(0, FLAGS_txn_keepalive_interval_ms);
   }
   {
     auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -431,7 +431,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   ASSERT_OK(txn_sys_client_->BeginTransaction(
       1, kUser, &txn_keepalive_ms, &highest_seen_txn_id));
   ASSERT_EQ(1, highest_seen_txn_id);
-  ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, txn_keepalive_ms);
+  ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, txn_keepalive_ms);
 
   // Trying to start another transaction with a used ID should yield an error.
   {
@@ -694,8 +694,8 @@ TEST_F(MultiServerTxnStatusTableITest, TestSystemClientLeadershipChange) {
     string new_leader_uuid;
     ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid));
     ASSERT_NE(new_leader_uuid, orig_leader_uuid);
+    ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
   });
-  ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
 }
 
 TEST_F(MultiServerTxnStatusTableITest, TestSystemClientCrashedNodes) {
diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc
index 6d12306..79049f3 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -31,6 +31,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/master/master.h"
 #include "kudu/master/mini_master.h"
@@ -56,13 +57,14 @@ using std::thread;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
+using strings::Substitute;
 
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(rpc_service_queue_length);
 DECLARE_int64(txn_manager_status_table_range_partition_span);
 DECLARE_uint32(txn_manager_status_table_num_replicas);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
 
 namespace kudu {
 namespace transactions {
@@ -324,7 +326,21 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     txn_id = resp.txn_id();
     ASSERT_LE(0, txn_id);
     ASSERT_TRUE(resp.has_keepalive_millis());
-    ASSERT_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
+    ASSERT_EQ(FLAGS_txn_keepalive_interval_ms, resp.keepalive_millis());
+    TxnStatePB txn_state;
+    NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+    ASSERT_EQ(TxnStatePB::OPEN, txn_state);
+  }
+
+  {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    KeepTransactionAliveRequestPB req;
+    KeepTransactionAliveResponsePB resp;
+    req.set_txn_id(txn_id);
+    ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
     ASSERT_EQ(TxnStatePB::OPEN, txn_state);
@@ -344,9 +360,6 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_state);
   }
 
-  // TODO(aserbin): add call to KeepTransactionAlive() when TxnStatusManager
-  //                has the functionality implemented.
-
   {
     RpcController ctx;
     PrepareRpcController(&ctx);
@@ -360,6 +373,26 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
     ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
   }
+
+  // Try to send keep-alive for already aborted transaction.
+  {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    KeepTransactionAliveRequestPB req;
+    KeepTransactionAliveResponsePB resp;
+    req.set_txn_id(txn_id);
+    ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(),
+        Substitute("transaction ID $0 is already in terminal state", txn_id));
+    // The transaction should stay in ABORTED state, of course.
+    TxnStatePB txn_state;
+    NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+    ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+  }
 }
 
 TEST_F(TxnManagerTest, BeginManyTransactions) {
@@ -404,7 +437,7 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
         txn_ids->emplace_back(txn_id);
       }
       CHECK(resp.has_keepalive_millis());
-      CHECK_EQ(FLAGS_transaction_keepalive_interval_ms, resp.keepalive_millis());
+      CHECK_EQ(FLAGS_txn_keepalive_interval_ms, resp.keepalive_millis());
     }
   };
 
@@ -496,20 +529,17 @@ TEST_F(TxnManagerTest, BeginManyTransactions) {
 //                (hence there will be multiple TxnManager instances) and verify
 //                how all this works in case of frequent master re-elections.
 
-// KeepTransactionAlive is not yet supported.
-// TODO(aserbin): update this scenario once KeepTransactionAlive is implemented
-TEST_F(TxnManagerTest, KeepTransactionAliveRpc) {
+TEST_F(TxnManagerTest, KeepTransactionAliveNonExistingTxnId) {
   RpcController ctx;
   PrepareRpcController(&ctx);
   KeepTransactionAliveRequestPB req;
-  req.set_txn_id(0);
+  req.set_txn_id(123);
   KeepTransactionAliveResponsePB resp;
   ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
   ASSERT_TRUE(resp.has_error());
   auto s = StatusFromPB(resp.error().status());
-  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_STR_CONTAINS(
-      s.ToString(), "Not implemented: KeepTransactionAlive is not supported yet");
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 123 not found");
 }
 
 } // namespace transactions
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index 4a016bc..0d4f871 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -230,12 +230,11 @@ Status TxnManager::AbortTransaction(int64_t txn_id,
   return txn_sys_client_->AbortTransaction(txn_id, username, ToDelta(deadline));
 }
 
-Status TxnManager::KeepTransactionAlive(int64_t /* txn_id */,
-                                        const string& /* username */,
+Status TxnManager::KeepTransactionAlive(int64_t txn_id,
+                                        const string& username,
                                         const MonoTime& deadline) {
   RETURN_NOT_OK(CheckInitialized(deadline));
-  // TODO(aserbin): call txn_sys_client_ once the functionality is there
-  return Status::NotSupported("KeepTransactionAlive is not supported yet");
+  return txn_sys_client_->KeepTransactionAlive(txn_id, username, ToDelta(deadline));
 }
 
 // This method isn't supposed to be called concurrently, so there isn't any
diff --git a/src/kudu/tablet/txn_coordinator.h b/src/kudu/tablet/txn_coordinator.h
index 8b718ce..39e6ae1 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -86,7 +86,6 @@ class TxnCoordinator {
   virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
                                   tserver::TabletServerErrorPB* ts_error) = 0;
 
-
   // Retrieves the status entry for the specified transaction, returning
   // Status::OK() in case of success with 'txn_status' populated. In case of
   // error, returns non-OK status. 'ts_error' is used to return not-the-leader
@@ -100,6 +99,13 @@ class TxnCoordinator {
       transactions::TxnStatusEntryPB* txn_status,
       tserver::TabletServerErrorPB* ts_error) = 0;
 
+  // Process keep-alive heartbeat for the specified transaction as the given
+  // user. This is to keep the transaction "alive", so the transaction would
+  // not be aborted automatically due to staleness.
+  virtual Status KeepTransactionAlive(int64_t txn_id,
+                                      const std::string& user,
+                                      tserver::TabletServerErrorPB* ts_error) = 0;
+
   // Registers a participant tablet ID to the given transaction ID as the given
   // user.
   //
@@ -110,12 +116,18 @@ class TxnCoordinator {
                                      const std::string& user,
                                      tserver::TabletServerErrorPB* ts_error) = 0;
 
-  // Populates a map from transaction ID to the list of participants associated
-  // with that transaction ID.
-  virtual ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const = 0;
+  // Abort transactions in non-terminal state which are considered 'stale'.
+  // The 'staleness' of a transaction is determined by particular
+  // implementation. This method can be called periodically to get rid of
+  // adandoned transactions.
+  virtual void AbortStaleTransactions() = 0;
 
   // The highest transaction ID seen by this coordinator.
   virtual int64_t highest_txn_id() const = 0;
+
+  // Populates a map from transaction ID to the list of participants associated
+  // with that transaction ID.
+  virtual ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const = 0;
 };
 
 class TxnCoordinatorFactory {
diff --git a/src/kudu/transactions/txn_status_entry.cc b/src/kudu/transactions/txn_status_entry.cc
index ed56c19..7170a0f 100644
--- a/src/kudu/transactions/txn_status_entry.cc
+++ b/src/kudu/transactions/txn_status_entry.cc
@@ -25,6 +25,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
 
 using std::string;
 using std::vector;
@@ -32,6 +33,13 @@ using std::vector;
 namespace kudu {
 namespace transactions {
 
+TransactionEntry::TransactionEntry(int64_t txn_id, std::string user)
+    : txn_id_(txn_id),
+      user_(std::move(user)),
+      last_heartbeat_time_(MonoTime::Now()),
+      state_(TxnStatePB::UNKNOWN) {
+}
+
 scoped_refptr<ParticipantEntry> TransactionEntry::GetOrCreateParticipant(
     const string& tablet_id) {
   DCHECK(metadata_.IsReadLocked());
diff --git a/src/kudu/transactions/txn_status_entry.h b/src/kudu/transactions/txn_status_entry.h
index 69c42ad..c96c0f8 100644
--- a/src/kudu/transactions/txn_status_entry.h
+++ b/src/kudu/transactions/txn_status_entry.h
@@ -16,16 +16,20 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/util/cow_object.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
 
 namespace kudu {
 namespace transactions {
@@ -50,13 +54,13 @@ class ParticipantEntry : public RefCountedThreadSafe<ParticipantEntry> {
  public:
   typedef PersistentParticipantEntry cow_state;
 
-  ParticipantEntry() {}
+  ParticipantEntry() = default;
   const CowObject<PersistentParticipantEntry>& metadata() const { return metadata_; }
   CowObject<PersistentParticipantEntry>* mutable_metadata() { return &metadata_; }
 
  private:
   friend class RefCountedThreadSafe<ParticipantEntry>;
-  ~ParticipantEntry() {}
+  ~ParticipantEntry() = default;
 
   // Mutable state for this participant with concurrent access controlled via
   // copy-on-write locking.
@@ -72,9 +76,8 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
  public:
   typedef PersistentTransactionEntry cow_state;
 
-  TransactionEntry(int64_t txn_id, std::string user)
-      : txn_id_(txn_id),
-        user_(std::move(user)) {}
+  TransactionEntry(int64_t txn_id, std::string user);
+
   const CowObject<PersistentTransactionEntry>& metadata() const { return metadata_; }
   CowObject<PersistentTransactionEntry>* mutable_metadata() { return &metadata_; }
 
@@ -89,9 +92,26 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
     return user_;
   }
 
+  // Return the last recorded heartbeat timestamp for the transaction.
+  MonoTime last_heartbeat_time() const {
+    return last_heartbeat_time_.load();
+  }
+  // Set the timestamp of last seen keep-alive heartbeat for the transaction.
+  void SetLastHeartbeatTime(const MonoTime& hbtime) {
+    last_heartbeat_time_.store(hbtime);
+  }
+
+  TxnStatePB state() const {
+    return state_.load();
+  }
+  void SetState(TxnStatePB state) {
+    DCHECK(metadata_.IsWriteLocked());
+    state_.store(state);
+  }
+
  private:
   friend class RefCountedThreadSafe<TransactionEntry>;
-  ~TransactionEntry() {}
+  ~TransactionEntry() = default;
 
   const int64_t txn_id_;
 
@@ -107,6 +127,21 @@ class TransactionEntry : public RefCountedThreadSafe<TransactionEntry> {
   // Mutable state for the transaction status record with concurrent access
   // controlled via copy-on-write locking.
   CowObject<PersistentTransactionEntry> metadata_;
+
+  // Time of the last keep-alive heartbeat received for the transaction
+  // identified by txn_id_. Using std::atomic wrapper since the field can be
+  // accessed concurrently by multiple threads:
+  //   * the field can be updated by concurrent KeepAlive requests from
+  //     different clients sending writes in the context of the same transaction
+  //   * the field is read by the stale transaction tracker in TxnStatusManager
+  std::atomic<MonoTime> last_heartbeat_time_;
+
+  // A shortcut for the state of the transaction stored in the metadata_ field.
+  // Callers should externally synchronize 'state_' with 'metadata_' by calling
+  // SetState() accordingly when TxnStatePB in 'metadata_' changes.
+  //
+  // TODO(aserbin): add a hook into CowObject::Commit() to do so automatically
+  std::atomic<TxnStatePB> state_;
 };
 
 typedef MetadataLock<TransactionEntry> TransactionEntryLock;
diff --git a/src/kudu/transactions/txn_status_manager-test.cc b/src/kudu/transactions/txn_status_manager-test.cc
index 146024f..960a742 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -32,6 +32,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -67,6 +68,8 @@ using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
 
+DECLARE_uint32(txn_keepalive_interval_ms);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
 METRIC_DECLARE_entity(tablet);
 
 namespace kudu {
@@ -86,12 +89,22 @@ class TxnStatusManagerTest : public TabletReplicaTestBase {
       : TabletReplicaTestBase(TxnStatusTablet::GetSchemaWithoutIds()) {}
 
   void SetUp() override {
+    // Using shorter intervals for transaction staleness tracking to speed up
+    // test scenarios verifying related functionality.
+    FLAGS_txn_keepalive_interval_ms = 200;
+    FLAGS_txn_staleness_tracker_interval_ms = 50;
+
     NO_FATALS(TabletReplicaTestBase::SetUp());
     ConsensusBootstrapInfo info;
     ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+    ASSERT_OK(ResetTxnStatusManager());
+  }
+
+  Status ResetTxnStatusManager() {
     txn_manager_.reset(new TxnStatusManager(tablet_replica_.get()));
-    ASSERT_OK(txn_manager_->LoadFromTablet());
+    return txn_manager_->LoadFromTablet();
   }
+
  protected:
   unique_ptr<TxnStatusManager> txn_manager_;
 };
@@ -162,13 +175,10 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
 
   // Now rebuild the underlying replica and rebuild the TxnStatusManager.
   ASSERT_OK(RestartReplica());
-  {
-    TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
-    ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
-    ASSERT_EQ(expected_prts_by_txn_id,
-              txn_manager_reloaded.GetParticipantsByTxnIdForTests());
-    ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id());
-  }
+  NO_FATALS(ResetTxnStatusManager());
+  ASSERT_EQ(expected_prts_by_txn_id,
+            txn_manager_->GetParticipantsByTxnIdForTests());
+  ASSERT_EQ(3, txn_manager_->highest_txn_id());
 
   // Verify that TxnStatusManager methods return Status::ServiceUnavailable()
   // if the transaction status tablet's data is not loaded yet.
@@ -201,11 +211,15 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
-      transactions::TxnStatusEntryPB txn_status;
+      TxnStatusEntryPB txn_status;
       s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status, &ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
+      s = tsm.KeepTransactionAlive(txn_id, kOwner, &ts_error);
+      ASSERT_TRUE(s.IsServiceUnavailable());
+      ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
+
       s = tsm.RegisterParticipant(txn_id, kParticipant1, kOwner, &ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
@@ -424,6 +438,16 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
 // TxnStatusManager::GetTransactionStatus() method.
 TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   {
+    TxnStatusEntryPB txn_status;
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->GetTransactionStatus(
+        1, kOwner, &txn_status, &ts_error);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
+    ASSERT_FALSE(txn_status.has_state());
+    ASSERT_FALSE(txn_status.has_user());
+  }
+  {
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
 
@@ -475,13 +499,8 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
 
   // Make the TxnStatusManager start from scratch.
-  {
-    ASSERT_OK(RestartReplica());
-    decltype(txn_manager_) txn_manager_reloaded(
-        new TxnStatusManager(tablet_replica_.get()));
-    ASSERT_OK(txn_manager_reloaded->LoadFromTablet());
-    txn_manager_ = std::move(txn_manager_reloaded);
-  }
+  ASSERT_OK(RestartReplica());
+  NO_FATALS(ResetTxnStatusManager());
 
   // Committed, aborted, and in-flight transactions should be known to the
   // TxnStatusManager even after restarting the underlying replica and
@@ -546,6 +565,150 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   }
 }
 
+// This test scenario verifies basic functionality of the
+// TxnStatusManager::KeepTransactionAlive() method w.r.t. state of the
+// transaction.
+TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
+  // Supplying not-yet-registered transaction ID.
+  {
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
+  }
+
+  // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
+    // Supplying wrong user for transaction in OPEN state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "transaction ID 1 not owned by stranger");
+    }
+
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 1 is in commit phase");
+    // Supplying wrong user for transaction in COMMIT_IN_PROGRESS state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "transaction ID 1 not owned by stranger");
+    }
+
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+    s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 1 is already in terminal state");
+    // Supplying wrong user for transaction in COMMITTED state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "transaction ID 1 not owned by stranger");
+    }
+  }
+
+  // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error));
+
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(2, kOwner, &ts_error));
+    auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 2 is in commit phase");
+
+    ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+    s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 2 is already in terminal state");
+    // Supplying wrong user for transaction in ABORTED state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
+    }
+  }
+
+  // OPEN --> ABORTED
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error));
+
+    ASSERT_OK(txn_manager_->AbortTransaction(3, kOwner, &ts_error));
+    auto s = txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 3 is already in terminal state");
+  }
+
+  // Open a new transaction just before restarting the TxnStatusManager.
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+  }
+
+  // Make the TxnStatusManager start from scratch.
+  ASSERT_OK(RestartReplica());
+  NO_FATALS(ResetTxnStatusManager());
+
+  // Committed, aborted, and in-flight transactions should be known to the
+  // TxnStatusManager even after restarting the underlying replica and
+  // rebuilding the TxnStatusManager from scratch, so KeepTransactionAlive()
+  // should behave the same as if no restart has happened.
+  {
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 1 is already in terminal state");
+    // Supplying wrong user for transaction in COMMITTED state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(1, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "transaction ID 1 not owned by stranger");
+    }
+  }
+  {
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "transaction ID 2 is already in terminal state");
+    // Supplying wrong user for transaction in ABORTED state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(2, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not owned by stranger");
+    }
+  }
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
+    // Supplying wrong user for transaction in OPEN state.
+    {
+      auto s = txn_manager_->KeepTransactionAlive(4, "stranger", &ts_error);
+      ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+                          "transaction ID 4 not owned by stranger");
+    }
+  }
+}
+
 // Test that performing actions as the wrong user will return errors.
 TEST_F(TxnStatusManagerTest, TestWrongUser) {
   const string kWrongUser = "stranger";
diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc
index 2030d88..a7f877a 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <mutex>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -40,15 +41,20 @@
 #include "kudu/util/cow_object.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 
-DEFINE_uint32(transaction_keepalive_interval_ms, 5000,
+DEFINE_uint32(txn_keepalive_interval_ms, 30000,
               "Maximum interval (in milliseconds) between subsequent "
               "keep-alive heartbeats to let the transaction status manager "
-              "know that a transaction is not abandoned");
-TAG_FLAG(transaction_keepalive_interval_ms, experimental);
+              "know that a transaction is not abandoned. If the transaction "
+              "status manager does not receive a keepalive message for a "
+              "longer interval than the specified, the transaction is "
+              "automatically aborted.");
+TAG_FLAG(txn_keepalive_interval_ms, experimental);
+TAG_FLAG(txn_keepalive_interval_ms, runtime);
 
 DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
              "Injects a random latency between 0 and this many milliseconds "
@@ -58,6 +64,15 @@ DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
 TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, hidden);
 TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
 
+DEFINE_uint32(txn_staleness_tracker_interval_ms, 10000,
+              "Period (in milliseconds) of the task that tracks and aborts "
+              "stale/abandoned transactions. If this flag is set to 0, "
+              "TxnStatusManager doesn't automatically abort stale/abandoned "
+              "transactions even if no keepalive messages are received for "
+              "longer than defined by the --txn_keepalive_interval_ms flag.");
+TAG_FLAG(txn_staleness_tracker_interval_ms, experimental);
+TAG_FLAG(txn_staleness_tracker_interval_ms, runtime);
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
@@ -151,6 +166,7 @@ Status TxnStatusManager::LoadFromTablet() {
   std::lock_guard<simple_spinlock> l(lock_);
   highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
   txns_by_id_ = std::move(txns_by_id);
+
   return Status::OK();
 }
 
@@ -270,6 +286,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
     TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
     txn_lock.mutable_data()->pb.set_state(TxnStatePB::OPEN);
     txn_lock.mutable_data()->pb.set_user(user);
+    txn->SetState(txn_lock.data().pb.state());
     txn_lock.Commit();
   }
   std::lock_guard<simple_spinlock> l(lock_);
@@ -302,6 +319,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& us
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
+  txn->SetState(txn_lock.data().pb.state());
   txn_lock.Commit();
   return Status::OK();
 }
@@ -328,11 +346,13 @@ Status TxnStatusManager::FinalizeCommitTransaction(
   mutable_data->pb.set_state(TxnStatePB::COMMITTED);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(
       txn_id, mutable_data->pb, ts_error));
+  txn->SetState(txn_lock.data().pb.state());
   txn_lock.Commit();
   return Status::OK();
 }
 
-Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& user,
+Status TxnStatusManager::AbortTransaction(int64_t txn_id,
+                                          const std::string& user,
                                           TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
@@ -353,6 +373,7 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& use
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::ABORTED);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, ts_error));
+  txn->SetState(txn_lock.data().pb.state());
   txn_lock.Commit();
   return Status::OK();
 }
@@ -376,6 +397,42 @@ Status TxnStatusManager::GetTransactionStatus(
   return Status::OK();
 }
 
+Status TxnStatusManager::KeepTransactionAlive(int64_t txn_id,
+                                              const string& user,
+                                              TabletServerErrorPB* ts_error) {
+  scoped_refptr<TransactionEntry> txn;
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
+
+  // It's a read (not write) lock because the last heartbeat time isn't
+  // persisted into the transaction status tablet. In other words, the last
+  // heartbeat time is a purely run-time piece of information for a
+  // TransactionEntry.
+  TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
+
+  const auto& pb = txn_lock.data().pb;
+  const auto& state = pb.state();
+  if (state != TxnStatePB::OPEN &&
+      state != TxnStatePB::COMMIT_IN_PROGRESS) {
+    return ReportIllegalTxnState(
+        Substitute("transaction ID $0 is already in terminal state: $1",
+                   txn_id, SecureShortDebugString(pb)),
+        ts_error);
+  }
+  // Keepalive updates are not required for a transaction in COMMIT_IN_PROGRESS
+  // state. The system takes care of a transaction once the client side
+  // initiates the commit phase.
+  if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
+    return ReportIllegalTxnState(
+        Substitute("transaction ID $0 is in commit phase: $1",
+                   txn_id, SecureShortDebugString(pb)),
+        ts_error);
+  }
+  DCHECK_EQ(TxnStatePB::OPEN, state);
+  txn->SetLastHeartbeatTime(MonoTime::Now());
+
+  return Status::OK();
+}
+
 Status TxnStatusManager::RegisterParticipant(
     int64_t txn_id,
     const string& tablet_id,
@@ -418,6 +475,75 @@ Status TxnStatusManager::RegisterParticipant(
   return Status::OK();
 }
 
+void TxnStatusManager::AbortStaleTransactions() {
+  const MonoDelta max_staleness_interval =
+      MonoDelta::FromMilliseconds(FLAGS_txn_keepalive_interval_ms);
+
+  auto* consensus = status_tablet_.tablet_replica_->consensus();
+  DCHECK(consensus);
+  if (consensus->role() != RaftPeerPB::LEADER) {
+    // Only leader replicas abort stale transactions registered with them.
+    // As of now, keep-alive requests are sent only to leader replicas, so only
+    // they have up-to-date information about the liveliness of corresponding
+    // transactions.
+    //
+    // If a non-leader replica errorneously (due to a network partition and
+    // the absence of leader leases) tried to abort a transaction, it would fail
+    // because aborting a transaction means writing into the transaction status
+    // tablet, so a non-leader replica's write attempt would be rejected by
+    // the Raft consensus protocol.
+    return;
+  }
+  TransactionsMap txns_by_id;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& elem : txns_by_id_) {
+      const auto state = elem.second->state();
+      // The tracker is interested only in open transactions. It's not concerned
+      // about transactions in terminal states (i.e. COMMITTED, ABORTED): there
+      // is nothing can be done with those. As for transactions in
+      // COMMIT_IN_PROGRESS state, the system should be take care of those
+      // without any participation from the client side, so txn keepalive
+      // messages are not required while the system tries to finalize those.
+      if (state == TxnStatePB::OPEN) {
+        txns_by_id.emplace(elem.first, elem.second);
+      }
+    }
+  }
+  const MonoTime now = MonoTime::Now();
+  for (auto& elem : txns_by_id) {
+    const auto& txn_id = elem.first;
+    const auto& txn_entry = elem.second;
+    const auto staleness_interval = now - txn_entry->last_heartbeat_time();
+    if (staleness_interval > max_staleness_interval) {
+      TabletServerErrorPB error;
+      auto s = AbortTransaction(txn_id, txn_entry->user(), &error);
+      if (PREDICT_TRUE(s.ok())) {
+        LOG(INFO) << Substitute(
+            "automatically aborted stale txn (ID $0) past $1 from "
+            "last keepalive heartbeat (effective timeout is $2)",
+            txn_id, staleness_interval.ToString(),
+            max_staleness_interval.ToString());
+      } else {
+        LOG(WARNING) << Substitute(
+            "failed to abort stale txn (ID $0) past $1 from "
+            "last keepalive heartbeat (effective timeout is $2): $3",
+            txn_id, staleness_interval.ToString(),
+            max_staleness_interval.ToString(), s.ToString());
+        if (consensus->role() != RaftPeerPB::LEADER ||
+            !status_tablet_.tablet_replica()->CheckRunning().ok()) {
+          // If the replica is no longer a leader at this point, there is
+          // no sense in processing the rest of the entries.
+          LOG(INFO) << "skipping staleness check for the rest of in-flight "
+                       "txn records since this txn status tablet replica "
+                       "is no longer a leader or not running";
+          return;
+        }
+      }
+    }
+  }
+}
+
 ParticipantIdsByTxnId TxnStatusManager::GetParticipantsByTxnIdForTests() const {
   ParticipantIdsByTxnId ret;
   std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/transactions/txn_status_manager.h b/src/kudu/transactions/txn_status_manager.h
index 25ae220..cc4f27e 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -124,6 +124,11 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
                               transactions::TxnStatusEntryPB* txn_status,
                               tserver::TabletServerErrorPB* ts_error) override;
 
+  // Processes keep-alive heartbeat for the specified transaction.
+  Status KeepTransactionAlive(int64_t txn_id,
+                              const std::string& user,
+                              tserver::TabletServerErrorPB* ts_error) override;
+
   // Creates an in-memory participant, writes an entry to the status table, and
   // attaches the in-memory participant to the transaction.
   //
@@ -133,15 +138,19 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
                              const std::string& user,
                              tserver::TabletServerErrorPB* ts_error) override;
 
-  // Populates a map from transaction ID to the sorted list of participants
-  // associated with that transaction ID.
-  tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;
+  // Abort transactions which are still in non-terminal state but haven't
+  // received keep-alive updates (see KeepTransactionAlive()) for a long time.
+  void AbortStaleTransactions() override;
 
   int64_t highest_txn_id() const override {
     std::lock_guard<simple_spinlock> l(lock_);
     return highest_txn_id_;
   }
 
+  // Populates a map from transaction ID to the sorted list of participants
+  // associated with that transaction ID.
+  tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;
+
  private:
   // Verifies that the transaction status data has already been loaded from the
   // underlying tablet and the replica is a leader. Returns Status::OK() if the
diff --git a/src/kudu/transactions/txn_status_tablet.h b/src/kudu/transactions/txn_status_tablet.h
index d87dd9b..a112d3f 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -117,6 +117,10 @@ class TxnStatusTablet {
                            const TxnParticipantEntryPB& pb,
                            tserver::TabletServerErrorPB* ts_error);
 
+  const tablet::TabletReplica* tablet_replica() const {
+    return tablet_replica_;
+  }
+
  private:
   friend class TxnStatusManager;
 
diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc
index 2d999e1..30d83d6 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -229,6 +229,20 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
   return ret;
 }
 
+Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,
+                                             const string& user,
+                                             MonoDelta timeout) {
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback()));
+  return s.Wait();
+}
+
 Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op,
                                                    const MonoDelta& timeout,
                                                    const StatusCallback& cb,
diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h
index acf1d67..6cb4407 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -120,6 +120,11 @@ class TxnSystemClient {
                               TxnStatusEntryPB* txn_status,
                               MonoDelta timeout = MonoDelta::FromSeconds(10));
 
+  // Send keep-alive heartbeat for the specified transaction as the given user.
+  Status KeepTransactionAlive(int64_t txn_id,
+                              const std::string& user,
+                              MonoDelta timeout = MonoDelta::FromSeconds(10));
+
   // Opens the transaction status table, refreshing metadata with that from the
   // masters.
   Status OpenTxnStatusTable();
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 12c6326..aa0b905 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -174,7 +174,7 @@ TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
-DECLARE_uint32(transaction_keepalive_interval_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
 
 METRIC_DEFINE_counter(
     server,
@@ -1201,6 +1201,7 @@ Status ValidateCoordinatorOpFields(const CoordinatorOpPB& op) {
     case CoordinatorOpPB::BEGIN_COMMIT_TXN:
     case CoordinatorOpPB::ABORT_TXN:
     case CoordinatorOpPB::GET_TXN_STATUS:
+    case CoordinatorOpPB::KEEP_TXN_ALIVE:
       if (!op.has_txn_id()) {
         return Status::InvalidArgument(Substitute("Missing txn id: $0",
                                                   SecureShortDebugString(op)));
@@ -1273,6 +1274,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
       s = txn_coordinator->GetTransactionStatus(
           txn_id, user, &txn_status, &ts_error);
       break;
+    case CoordinatorOpPB::KEEP_TXN_ALIVE:
+      s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error);
+      break;
     default:
       s = Status::InvalidArgument(Substitute("Unknown op type: $0", op.type()));
   }
@@ -1289,7 +1293,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
     *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
   } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
     resp->mutable_op_result()->set_keepalive_millis(
-        FLAGS_transaction_keepalive_interval_ms);
+        FLAGS_txn_keepalive_interval_ms);
   }
   if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
     DCHECK_GE(highest_seen_txn_id, 0);
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3b1127a..11f8a28 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -55,6 +55,7 @@
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_coordinator.h"
 #include "kudu/transactions/txn_status_manager.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/tablet_server.h"
@@ -143,7 +144,14 @@ DEFINE_int32(tablet_open_inject_latency_ms, 0,
             "Injects latency into the tablet opening. For use in tests only.");
 TAG_FLAG(tablet_open_inject_latency_ms, unsafe);
 
+DEFINE_uint32(txn_staleness_tracker_disabled_interval_ms, 60000,
+              "Polling interval (in milliseconds) for the disabled staleness "
+              "transaction tracker task to check whether it's been re-enabled. "
+              "This is made configurable only for testing purposes.");
+TAG_FLAG(txn_staleness_tracker_disabled_interval_ms, hidden);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
 
 METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized,
                           "Number of Not Initialized Tablets",
@@ -252,6 +260,7 @@ TSTabletManager::TSTabletManager(TabletServer* server)
   : fs_manager_(server->fs_manager()),
     cmeta_manager_(new ConsensusMetadataManager(fs_manager_)),
     server_(server),
+    shutdown_latch_(1),
     metric_registry_(server->metric_registry()),
     tablet_copy_metrics_(server->metric_entity()),
     state_(MANAGER_INITIALIZING) {
@@ -373,6 +382,19 @@ Status TSTabletManager::Init() {
                 .set_max_threads(max_delete_threads)
                 .Build(&delete_tablet_pool_));
 
+  // TODO(aserbin): if better parallelism is needed to serve higher txn volume,
+  //                consider using multiple threads in this pool and schedule
+  //                per-tablet-replica clean-up tasks via threadpool serial
+  //                tokens to make sure no more than one clean-up task
+  //                is running against a txn status tablet replica.
+  RETURN_NOT_OK(ThreadPoolBuilder("txn-status-manager")
+                .set_max_threads(1)
+                .set_max_queue_size(0)
+                .Build(&txn_status_manager_pool_));
+  RETURN_NOT_OK(txn_status_manager_pool_->Submit([this]() {
+    this->TxnStalenessTrackerTask();
+  }));
+
   // Search for tablets in the metadata dir.
   vector<string> tablet_ids;
   RETURN_NOT_OK(fs_manager_->ListTabletIds(&tablet_ids));
@@ -1225,10 +1247,15 @@ void TSTabletManager::Shutdown() {
   // Shut down the delete pool, so no new tablets are deleted after this point.
   delete_tablet_pool_->Shutdown();
 
+  // Signal the only task running on the txn_status_manager_pool_ to wrap up.
+  shutdown_latch_.CountDown();
+  // Shut down the pool running the dedicated TxnStatusManager-related task.
+  txn_status_manager_pool_->Shutdown();
+
   // Take a snapshot of the replicas list -- that way we don't have to hold
   // on to the lock while shutting them down, which might cause a lock
   // inversion. (see KUDU-308 for example).
-  vector<scoped_refptr<TabletReplica> > replicas_to_shutdown;
+  vector<scoped_refptr<TabletReplica>> replicas_to_shutdown;
   GetTabletReplicas(&replicas_to_shutdown);
 
   for (const scoped_refptr<TabletReplica>& replica : replicas_to_shutdown) {
@@ -1347,6 +1374,46 @@ void TSTabletManager::InitLocalRaftPeerPB() {
   *local_peer_pb_.mutable_last_known_addr() = HostPortToPB(hp);
 }
 
+void TSTabletManager::TxnStalenessTrackerTask() {
+  while (true) {
+    // This little dance below is to provide functionality of re-enabling the
+    // staleness tracking task at run-time without restarting the process.
+    auto interval_ms = FLAGS_txn_staleness_tracker_interval_ms;
+    bool task_enabled = true;
+    if (interval_ms == 0) {
+      // The task is disabled.
+      interval_ms = FLAGS_txn_staleness_tracker_disabled_interval_ms;
+      task_enabled = false;
+    }
+    // Wait for a notification on shutdown or a timeout expiration.
+    if (shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(interval_ms))) {
+      return;
+    }
+    if (!task_enabled) {
+      continue;
+    }
+
+    vector<scoped_refptr<TabletReplica>> replicas;
+    {
+      shared_lock<RWMutex> l(lock_);
+      for (const auto& elem : tablet_map_) {
+        auto r = elem.second;
+        // Find txn status tablet replicas.
+        if (r->txn_coordinator()) {
+          replicas.emplace_back(std::move(r));
+        }
+      }
+    }
+    for (auto& r : replicas) {
+      if (shutdown_latch_.count() == 0) {
+        return;
+      }
+      auto* coordinator = DCHECK_NOTNULL(r->txn_coordinator());
+      coordinator->AbortStaleTransactions();
+    }
+  }
+}
+
 void TSTabletManager::CreateReportedTabletPB(const scoped_refptr<TabletReplica>& replica,
                                              ReportedTabletPB* reported_tablet) const {
   reported_tablet->set_tablet_id(replica->tablet_id());
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 5ecc73f..3bacffa 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -38,6 +38,7 @@
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -347,6 +348,10 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // running state.
   void InitLocalRaftPeerPB();
 
+  // A task to check for the staleness of transactions registered with
+  // corresponding transaction status tablets (if any).
+  void TxnStalenessTrackerTask();
+
   // Just for tests.
   void SetNextUpdateTimeForTests();
 
@@ -365,6 +370,14 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // tablet_state_counts_, and last_walked_.
   mutable RWMutex lock_;
 
+  // A latch to notify the task running on the txn_status_manager_pool_ on
+  // shutdown.
+  //
+  // TODO(aserbin): instead of using CountDownLatch, extend ConditionVariable
+  //                to be able to work with RWMutex and use lock_ from above
+  //                to create one to notify the task on shutdown
+  CountDownLatch shutdown_latch_;
+
   // Map from tablet ID to tablet
   TabletMap tablet_map_;
 
@@ -399,6 +412,11 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Thread pool used to delete tablets asynchronously.
   std::unique_ptr<ThreadPool> delete_tablet_pool_;
 
+  // Thread pool to run TxnStatusManager tasks. As of now, this pool is
+  // to run a long-running single periodic task to abort stale transactions
+  // registered with corresponding transaction status tablets.
+  std::unique_ptr<ThreadPool> txn_status_manager_pool_;
+
   // Ensures that we only update stats from a single thread at a time.
   mutable rw_spinlock lock_update_;
   MonoTime next_update_time_;