You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/13 06:44:49 UTC

[kudu] branch master updated: KUDU-2612: send next txn keepalive sooner in case of timeout

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa9beb  KUDU-2612: send next txn keepalive sooner in case of timeout
7fa9beb is described below

commit 7fa9beb0f8a6f8902610515d894f7fb79a144154
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Apr 12 16:26:26 2021 -0700

    KUDU-2612: send next txn keepalive sooner in case of timeout
    
    This patch updates the code of the C++ client to set the timeout for
    TransactionKeepAlive() RPC to be half of the target period for sending
    keepalive messages.  Also, if the RPC with the prior keepalive message
    timed out, sending next keepalive message sooner gives more chances for
    the transaction to survive before it's automatically aborted by the
    backend due to not receiving keepalive messages for long time.
    
    This patch also makes the corresponding pieces of the Kudu C++ and
    the Java clients consistent (see https://gerrit.cloudera.org/#/c/17305).
    
    Change-Id: Ic12c5615152e61ee81e4f48e4978d4c5b1fa9828
    Reviewed-on: http://gerrit.cloudera.org:8080/17310
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/transaction-internal.cc | 47 +++++++++++++++++++++++++--------
 src/kudu/client/transaction-internal.h  | 11 +++++---
 2 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index 1436e24..d639dac 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -79,6 +79,25 @@ KuduTransaction::Data::Data(const sp::shared_ptr<KuduClient>& client)
   CHECK(client);
 }
 
+MonoDelta KuduTransaction::Data::GetKeepaliveRpcPeriod() const {
+  // Ideally, it would be enough to send a heartbeat message every
+  // txn_keep_alive_ms_ interval, but given scheduling irregularities,
+  // client node timer's precision, and various network delays and latencies,
+  // it's safer to schedule sending keepalive messages from the client side
+  // more frequently.
+  return MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn_keep_alive_ms_ / 2));
+}
+
+MonoDelta KuduTransaction::Data::GetKeepaliveRpcTimeout() const {
+  // If something goes wrong and keepalive RPC fails, it should be possible
+  // to retry sending keepalive message a couple of times before the transaction
+  // is automatically aborted by the backend after not receiving keepalive
+  // messages for longer than the keepalive timeout for the transaction.
+  return MonoDelta::FromMilliseconds(
+      std::max<uint32_t>(1, txn_keep_alive_ms_ / 4));
+}
+
 Status KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session) {
   auto c = weak_client_.lock();
   if (!c) {
@@ -127,8 +146,6 @@ Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn)
   DCHECK_GT(txn_keep_alive_ms_, 0);
 
   // Start sending regular heartbeats for the new transaction.
-  auto next_run_after = MonoDelta::FromMilliseconds(
-      std::max<uint32_t>(1, txn_keep_alive_ms_ / 2));
   auto m = c->data_->messenger_;
   if (PREDICT_FALSE(!m)) {
     return Status::IllegalState("null messenger in Kudu client");
@@ -139,7 +156,7 @@ Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn)
       [weak_txn](const Status& s) {
         SendTxnKeepAliveTask(s, weak_txn);
       },
-      next_run_after);
+      GetKeepaliveRpcPeriod());
 
   return Status::OK();
 }
@@ -261,13 +278,11 @@ Status KuduTransaction::Data::Deserialize(
     auto m = client->data_->messenger_;
     if (PREDICT_TRUE(m)) {
       sp::weak_ptr<KuduTransaction> weak_txn(ret);
-      auto next_run_after = MonoDelta::FromMilliseconds(
-          std::max<uint32_t>(1, ret->data_->txn_keep_alive_ms_ / 2));
       m->ScheduleOnReactor(
           [weak_txn](const Status& s) {
             SendTxnKeepAliveTask(s, weak_txn);
           },
-          next_run_after);
+          ret->data_->GetKeepaliveRpcPeriod());
     }
   }
 
@@ -388,9 +403,7 @@ void KuduTransaction::Data::SendTxnKeepAliveTask(
   }
 
   const auto& txn_id = txn->data_->txn_id_;
-  const auto next_run_after = MonoDelta::FromMilliseconds(
-      std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
-  auto deadline = MonoTime::Now() + next_run_after;
+  auto deadline = MonoTime::Now() + txn->data_->GetKeepaliveRpcTimeout();
 
   sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
   ctx->weak_txn = weak_txn;
@@ -446,11 +459,23 @@ void KuduTransaction::Data::TxnKeepAliveCb(
   if (PREDICT_FALSE(!c)) {
     return;
   }
+  // If there was an error with the prior request, send the next one sooner
+  // since one heartbeat has just been missed. If the prior request timed out,
+  // send the next one as soon as possible. It's been a long interval since the
+  // previous successfully sent keepalive message and it's been a long enough
+  // interval since issuing preivous KeepTransactionAlive() RPC, so this should
+  // not put too much pressure on the cluster (assuming the keepalive intervals
+  // for a transaction is in order of a minute). Sending next message sooner
+  // increases the chances of eventually getting through in such a case.
+  const auto next_run_after =
+      s.ok() ? txn->data_->GetKeepaliveRpcPeriod()
+             : (s.IsTimedOut() ? MonoDelta::FromMilliseconds(1)
+                               : txn->data_->GetKeepaliveRpcTimeout());
+  DCHECK_GE(txn->data_->GetKeepaliveRpcPeriod().ToMilliseconds(),
+            txn->data_->GetKeepaliveRpcTimeout().ToMilliseconds());
   auto m = c->data_->messenger_;
   if (PREDICT_TRUE(m)) {
     auto weak_txn = ctx->weak_txn;
-    const auto next_run_after = MonoDelta::FromMilliseconds(
-        std::max<uint32_t>(1, txn->data_->txn_keep_alive_ms_ / 2));
     m->ScheduleOnReactor(
         [weak_txn](const Status& s) {
           SendTxnKeepAliveTask(s, weak_txn);
diff --git a/src/kudu/client/transaction-internal.h b/src/kudu/client/transaction-internal.h
index 19bb414..9835f4a 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -24,12 +24,10 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-
-class MonoTime;
-
 namespace client {
 
 struct KeepaliveRpcCtx;
@@ -51,6 +49,13 @@ class KuduTransaction::Data {
  public:
   explicit Data(const sp::shared_ptr<KuduClient>& client);
 
+  // A utility method returning the period for sending txn keepalive messages
+  // (i.e. issuing KeepTransactionAlive() RPCs) for this multi-row transaction.
+  MonoDelta GetKeepaliveRpcPeriod() const;
+
+  // A utility method returning the timeout for KeepTransactionAlive() RPCs.
+  MonoDelta GetKeepaliveRpcTimeout() const;
+
   Status CreateSession(sp::shared_ptr<KuduSession>* session);
 
   Status Begin(const sp::shared_ptr<KuduTransaction>& txn);