You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/04/28 06:04:23 UTC

[kudu] branch master updated: [client] retry master RPCs on network errors

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 96047a8  [client] retry master RPCs on network errors
96047a8 is described below

commit 96047a8d861d61673b9e2589930eccd78a16e483
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Apr 23 14:03:40 2021 -0700

    [client] retry master RPCs on network errors
    
    AsyncLeaderMasterRpc and AsyncRandomTxnManagerRpc fail fast when it runs
    into a network error. We expect such errors in a couple of scenarios:
    - the client is misconfigured and is pointing at the wrong masters
    - the masters are down or otherwise unavailable
    
    The current handling makes it convenient in the former case, since users
    can immediately adjust their application or tooling with the correct
    masters list. However, this seems to be a far less expected scenario
    than the latter. So, this patch updates the behavior to retry on such
    errors, as we would an RPC timeout. The end result is that
    leader-master-bound requests should be more resilient to master
    failures.
    
    The primary motivation for this is to make TxnManager calls more robust
    to master failures -- a later patch will leverage this behavior.
    
    Change-Id: Iae2febb9e890acf6f7efd5cce3cb7e4f7b5f683d
    Reviewed-on: http://gerrit.cloudera.org:8080/17343
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/client-test.cc                     | 73 ++++++++++------------
 src/kudu/client/master_proxy_rpc.cc                | 14 ++---
 src/kudu/client/transaction-internal.cc            | 12 ++--
 src/kudu/client/txn_manager_proxy_rpc.cc           | 17 ++---
 src/kudu/integration-tests/master_hms-itest.cc     |  6 +-
 src/kudu/integration-tests/txn_commit-itest.cc     |  2 +-
 .../integration-tests/txn_status_table-itest.cc    | 45 ++++++++++++-
 7 files changed, 97 insertions(+), 72 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 76bf022..7ab3e85 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -1010,7 +1010,7 @@ TEST_F(ClientTest, TestMasterDown) {
   shared_ptr<KuduTable> t;
   client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1);
   Status s = client_->OpenTable("other-tablet", &t);
-  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 }
 
 // Test that we get errors when we try incorrectly configuring the scanner, and
@@ -3564,19 +3564,13 @@ void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) {
   ASSERT_TRUE(s.IsIOError()) << s.ToString();
 
   unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
-  switch (which) {
-    case DEAD_MASTER:
-      // Only one master, so no retry for finding the new leader master.
-      ASSERT_TRUE(error->status().IsNetworkError());
-      break;
-    case DEAD_TSERVER:
-      ASSERT_TRUE(error->status().IsTimedOut());
-      // TODO(KUDU-1466) Re-enable this assertion once the jira gets solved. We can't actually
-      // make an assertion on the reason for the timeout since sometimes tablet server connection
-      // errors get reported as GetTabletLocations timeouts.
-      // ASSERT_STR_CONTAINS(error->status().ToString(), "Connection refused");
-      break;
-  }
+  ASSERT_TRUE(error->status().IsTimedOut());
+  // TODO(KUDU-1466) Re-enable this assertion once the jira gets solved. We can't actually
+  // make an assertion on the reason for the timeout since sometimes tablet server connection
+  // errors get reported as GetTabletLocations timeouts.
+  // if (which == DEAD_TSERVER) {
+  //   ASSERT_STR_CONTAINS(error->status().ToString(), "Connection refused");
+  // }
 
   ASSERT_EQ(error->failed_op().ToString(),
             R"(INSERT int32 key=1, int32 int_val=1, string string_val="x")");
@@ -7456,6 +7450,10 @@ TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
 
 TEST_F(ClientTest, NoTxnManager) {
   shared_ptr<KuduTransaction> txn;
+  KuduClientBuilder builder;
+  builder.default_admin_operation_timeout(MonoDelta::FromSeconds(1));
+  builder.default_rpc_timeout(MonoDelta::FromMilliseconds(100));
+  ASSERT_OK(cluster_->CreateClient(&builder, &client_));
   ASSERT_OK(client_->NewTransaction(&txn));
 
   // Shutdown all masters: a TxnManager is a part of master, so after shutting
@@ -7465,8 +7463,7 @@ TEST_F(ClientTest, NoTxnManager) {
   {
     shared_ptr<KuduTransaction> txn;
     auto s = client_->NewTransaction(&txn);
-    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
   const vector<pair<string, Status>> txn_ctl_results = {
@@ -7476,8 +7473,7 @@ TEST_F(ClientTest, NoTxnManager) {
   for (const auto& op_and_status : txn_ctl_results) {
     SCOPED_TRACE(op_and_status.first);
     const auto& s = op_and_status.second;
-    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 }
 
@@ -7601,13 +7597,14 @@ TEST_F(ClientTest, TxnKeepAlive) {
 TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerShortTime) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
-#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
-  // ASAN and TSAN use longer intervals to avoid flakiness: it takes some
-  // time to restart masters, and that adds up to the unavailability interval.
   constexpr const auto kUnavailabilityIntervalMs = 5000;
-#else
-  constexpr const auto kUnavailabilityIntervalMs = 1000;
-#endif
+  // Use a short timeout for the RPC so our attempts to commit don't trigger a
+  // keepalive failure.
+  const auto kShortTimeout = MonoDelta::FromMilliseconds(kUnavailabilityIntervalMs / 2);
+  KuduClientBuilder builder;
+  builder.default_admin_operation_timeout(kShortTimeout);
+  builder.default_rpc_timeout(kShortTimeout);
+  ASSERT_OK(cluster_->CreateClient(&builder, &client_));
 
   // To avoid flakiness, set the txn keepalive interval longer than it is in
   // other scenarios (NOTE: it's still much shorter than it's default value
@@ -7627,19 +7624,12 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerShortTime) {
   // RPC calls from clients to corresponding TxnStatusManager.
   cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
 
-  // An attempt to commit a transaction should fail right away (i.e. no retries)
-  // due to unreachable masters.
+  // An attempt to commit a transaction should fail due to unreachable masters.
   {
     auto s = txn->Commit(false /* wait */);
-    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "connection negotiation failed");
-    ASSERT_STR_CONTAINS(s.ToString(), "Connection refused");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
-  // Wait for some time, but not too long to allow the system to get txn
-  // keepalive heartbeat before the timeout.
-  SleepFor(MonoDelta::FromMilliseconds(kUnavailabilityIntervalMs));
-
   // Start masters back.
   for (auto idx = 0; idx < cluster_->num_masters(); ++idx) {
     ASSERT_OK(cluster_->mini_master(idx)->Restart());
@@ -7654,6 +7644,16 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerShortTime) {
 // aborted if Kudu masters are not available for time period longer than
 // the txn keepalive interval.
 TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerLongTime) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Set the timeout to be long so when we fail to commit, the system will
+  // automatically abort the transaction.
+  KuduClientBuilder builder;
+  builder.default_admin_operation_timeout(
+      MonoDelta::FromMilliseconds(5 * FLAGS_txn_keepalive_interval_ms));
+  builder.default_rpc_timeout(MonoDelta::FromSeconds(1));
+  ASSERT_OK(cluster_->CreateClient(&builder, &client_));
+
   shared_ptr<KuduTransaction> txn;
   ASSERT_OK(client_->NewTransaction(&txn));
 
@@ -7661,15 +7661,10 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerLongTime) {
   // RPC calls from clients to corresponding TxnStatusManager.
   cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
 
-  // Wait for some time to allow the system to detect stale transactions.
-  SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
-
   // An attempt to commit a transaction should fail due to unreachable masters.
   {
     auto s = txn->Commit(false /* wait */);
-    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "connection negotiation failed");
-    ASSERT_STR_CONTAINS(s.ToString(), "Connection refused");
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
   // Start masters back.
diff --git a/src/kudu/client/master_proxy_rpc.cc b/src/kudu/client/master_proxy_rpc.cc
index 92c7bda..975a7af 100644
--- a/src/kudu/client/master_proxy_rpc.cc
+++ b/src/kudu/client/master_proxy_rpc.cc
@@ -227,16 +227,10 @@ bool AsyncLeaderMasterRpc<ReqClass, RespClass>::RetryOrReconnectIfNecessary(
     return true;
   }
   // Network errors may be caused by errors in connecting sockets, which could
-  // mean a master is down or doesn't exist. If there's another master to
-  // connect to, connect to it. Otherwise, don't bother retrying.
-  if (s.IsNetworkError()) {
-    if (is_multi_master) {
-      ResetMasterLeaderAndRetry(CredentialsPolicy::ANY_CREDENTIALS);
-      return true;
-    }
-  }
-  if (s.IsTimedOut()) {
-    // If we timed out before the deadline and there's still time left for the
+  // mean a master is down or doesn't exist. This may be transient, so treat
+  // them as we would a timeout and retry.
+  if (s.IsTimedOut() || s.IsNetworkError()) {
+    // If we got here before the deadline and there's still time left for the
     // operation, try to reconnect to the master(s).
     if (MonoTime::Now() < retrier().deadline()) {
       if (is_multi_master) {
diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc
index 0aed483..f702716 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -389,6 +389,7 @@ struct KeepaliveRpcCtx {
   unique_ptr<AsyncRandomTxnManagerRpc<KeepTransactionAliveRequestPB,
                                       KeepTransactionAliveResponsePB>> rpc;
   sp::weak_ptr<KuduTransaction> weak_txn;
+  sp::shared_ptr<KuduClient> client;
 };
 
 void KuduTransaction::Data::SendTxnKeepAliveTask(
@@ -423,6 +424,10 @@ void KuduTransaction::Data::SendTxnKeepAliveTask(
   sp::shared_ptr<KeepaliveRpcCtx> ctx(new KeepaliveRpcCtx);
   ctx->weak_txn = weak_txn;
 
+  // We need to ensure the KuduClient outlives each RPC, so maintain a
+  // reference to the KuduClient until the callback is called.
+  ctx->client = c;
+
   {
     KeepTransactionAliveRequestPB req;
     req.set_txn_id(txn_id);
@@ -470,10 +475,7 @@ void KuduTransaction::Data::TxnKeepAliveCb(
 
   // Re-schedule the task, so it will send another keepalive heartbeat as
   // necessary.
-  sp::shared_ptr<KuduClient> c(txn->data_->weak_client_.lock());
-  if (PREDICT_FALSE(!c)) {
-    return;
-  }
+  //
   // If there was an error with the prior request, send the next one sooner
   // since one heartbeat has just been missed. If the prior request timed out,
   // send the next one as soon as possible. It's been a long interval since the
@@ -488,7 +490,7 @@ void KuduTransaction::Data::TxnKeepAliveCb(
                                : txn->data_->GetKeepaliveRpcTimeout());
   DCHECK_GE(txn->data_->GetKeepaliveRpcPeriod().ToMilliseconds(),
             txn->data_->GetKeepaliveRpcTimeout().ToMilliseconds());
-  auto m = c->data_->messenger_;
+  auto m = ctx->client->data_->messenger_;
   if (PREDICT_TRUE(m)) {
     auto weak_txn = ctx->weak_txn;
     m->ScheduleOnReactor(
diff --git a/src/kudu/client/txn_manager_proxy_rpc.cc b/src/kudu/client/txn_manager_proxy_rpc.cc
index c58d2df..cf72650 100644
--- a/src/kudu/client/txn_manager_proxy_rpc.cc
+++ b/src/kudu/client/txn_manager_proxy_rpc.cc
@@ -170,18 +170,11 @@ bool AsyncRandomTxnManagerRpc<ReqClass, RespClass>::RetryIfNecessary(
   }
 
   // Network errors may be caused by errors in connecting sockets, which could
-  // mean a TxnManager is down or doesn't exist. If there's another TxnManager
-  // to connect to, connect to it. Otherwise, don't bother retrying.
-  if (s.IsNetworkError()) {
-    if (multi_txn_manager_) {
-      ResetTxnManagerAndRetry(s);
-      return true;
-    }
-  }
-  if (s.IsTimedOut()) {
-    // If establishing connection failed with time out error before the overall
-    // deadline for RPC operation, retry the operation; if multiple TxnManagers
-    // are available, try with another one.
+  // mean a master is down or doesn't exist. This may be transient, so treat
+  // them as we would a timeout and retry.
+  if (s.IsTimedOut() || s.IsNetworkError()) {
+    // If we got here before the deadline and there's still time left for the
+    // operation, try to reconnect to the master(s).
     if (MonoTime::Now() < retrier().deadline()) {
       if (multi_txn_manager_) {
         ResetTxnManagerAndRetry(s);
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index c0ae05e..b58e8a8 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -716,11 +716,13 @@ TEST_F(MasterHmsUpgradeTest, TestConflictingNormalizedNames) {
   // itself leader, in which case ExternalMiniCluster::Restart() can succeed. In
   // this situation a fallback to a leader-only API will deterministically fail.
   Status s = cluster_->Restart();
-  if (s.ok()) {
+  if (!s.ok()) {
+    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  } else {
     vector<string> tables;
     s = client_->ListTables(&tables);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
-  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 
   // Disable the metastore integration and rename one of the conflicting tables.
   cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc
index de7d6fc..db2e913 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -640,7 +640,7 @@ TEST_F(TxnCommitITest, TestLoadTxnStatusManagerWhenNoMasters) {
 
   // While the master is down, we can't contact the TxnManager.
   Status s = BeginTransaction(&txn, &txn_session);
-  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 
   // Once restarted, it should be business as usual.
   ASSERT_OK(cluster_->mini_master()->Restart());
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc
index a766dda..8195e81 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -450,6 +450,44 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
 }
 
+TEST_F(TxnStatusTableITest, TestSystemClientMasterDown) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+  cluster_->mini_master(0)->Shutdown();
+
+  // When the only server is down, the system client should keep trying until
+  // it times out.
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+        MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    // The 'highest_seen_txn_id' should be left untouched.
+    ASSERT_EQ(-1, highest_seen_txn_id);
+  }
+
+  // Now try with a longer timeout and ensure that if the server comes back up,
+  // the system client will succeed.
+  thread t([&] {
+    // Wait a bit to give some time for the system client to make its request
+    // and retry some.
+    SleepFor(MonoDelta::FromMilliseconds(500));
+    CHECK_OK(cluster_->mini_master(0)->Restart());
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+
+  int64_t highest_seen_txn_id = -1;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
+      MonoDelta::FromSeconds(10)));
+  // Make sure the highest txn ID we've seen matches the one we just started.
+  ASSERT_EQ(1, highest_seen_txn_id);
+}
+
 TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
@@ -474,7 +512,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
     // Wait a bit to give some time for the system client to make its request
     // and retry some.
     SleepFor(MonoDelta::FromMilliseconds(500));
-    ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+    CHECK_OK(cluster_->mini_tablet_server(0)->Restart());
   });
   SCOPED_CLEANUP({
     t.join();
@@ -483,8 +521,9 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   int64_t highest_seen_txn_id = -1;
   ASSERT_OK(txn_sys_client_->BeginTransaction(
       1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id,
-      MonoDelta::FromSeconds(3)));
-  ASSERT_EQ(highest_seen_txn_id, 1);
+      MonoDelta::FromSeconds(10)));
+  // Make sure the highest txn ID we've seen matches the one we just started.
+  ASSERT_EQ(1, highest_seen_txn_id);
 }
 
 TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {