You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/09/14 19:51:07 UTC

[2/2] kudu git commit: KUDU-2125: Tablet copy client does not retry on failures

KUDU-2125: Tablet copy client does not retry on failures

The tablet copy client would fail the tablet copy after encountering an
RPC error. This commit adds retry logic for tablet copy operations when
encountering SERVER_TOO_BUSY or UNAVAILABLE errors, which are retriable.

Change-Id: I7c8454fc600a841bd15306a2b3b06ddf53130be6
Reviewed-on: http://gerrit.cloudera.org:8080/8016
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e2c0e1db
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e2c0e1db
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e2c0e1db

Branch: refs/heads/master
Commit: e2c0e1dbdc4fa15a6477a82fda5495928a26a508
Parents: 449b5f5
Author: Dan Burkert <da...@apache.org>
Authored: Wed Sep 6 20:24:54 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Sep 14 19:50:43 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/cluster_itest_util.cc     | 12 ++-
 src/kudu/integration-tests/cluster_itest_util.h | 14 ++--
 .../tablet_copy_client_session-itest.cc         | 53 ++++++++++++
 src/kudu/tserver/tablet_copy_client.cc          | 85 ++++++++++++++------
 src/kudu/tserver/tablet_copy_client.h           |  8 ++
 5 files changed, 139 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index ffe978a..cb5a880 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -840,7 +840,8 @@ Status WaitForNumVotersInConfigOnMaster(const shared_ptr<MasterServiceProxy>& ma
 Status WaitForNumTabletsOnTS(TServerDetails* ts,
                              int count,
                              const MonoDelta& timeout,
-                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
+                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets,
+                             boost::optional<tablet::TabletStatePB> state) {
   // If the user doesn't care about collecting the resulting tablets, collect into a local
   // vector.
   vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets_local;
@@ -850,6 +851,15 @@ Status WaitForNumTabletsOnTS(TServerDetails* ts,
   MonoTime deadline = MonoTime::Now() + timeout;
   while (true) {
     s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets);
+    if (s.ok() && state) {
+      tablets->erase(
+          std::remove_if(tablets->begin(), tablets->end(),
+                         [&] (const ListTabletsResponsePB::StatusAndSchemaPB& t) {
+                           return t.tablet_status().state() != state;
+                         }),
+          tablets->end());
+    }
+
     if (s.ok() && tablets->size() == count) break;
     if (MonoTime::Now() > deadline) break;
     SleepFor(MonoDelta::FromMilliseconds(10));

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index f5e7d27..59276dd 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -33,6 +33,8 @@
 #include <unordered_map>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
@@ -46,11 +48,6 @@
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 
-namespace boost {
-template <class T>
-class optional;
-}
-
 namespace kudu {
 class HostPort;
 class MonoDelta;
@@ -320,12 +317,15 @@ Status WaitForNumVotersInConfigOnMaster(
     const MonoDelta& timeout);
 
 // Repeatedly invoke ListTablets(), waiting for up to 'timeout' time for the
-// specified 'count' number of replicas.
+// specified 'count' number of replicas. If 'state' is provided, the replicas
+// must also be in the specified state for the wait to be considered
+// successful.
 Status WaitForNumTabletsOnTS(
     TServerDetails* ts,
     int count,
     const MonoDelta& timeout,
-    std::vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB>* tablets);
+    std::vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB>* tablets,
+    boost::optional<tablet::TabletStatePB> state = boost::none);
 
 // Check if the tablet is in the specified state.
 Status CheckIfTabletInState(TServerDetails* ts,

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
index a4a1a03..0625905 100644
--- a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <vector>
 
@@ -31,6 +32,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/external_mini_cluster.h"
@@ -52,7 +54,9 @@ using kudu::itest::WaitUntilTabletRunning;
 using kudu::tablet::TabletDataState;
 using kudu::tserver::ListTabletsResponsePB;
 using std::string;
+using std::thread;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 
@@ -265,4 +269,53 @@ TEST_F(TabletCopyClientSessionITest, TestCopyFromCrashedSource) {
   ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
 }
 
+// Regression for KUDU-2125: ensure that a heavily loaded source cluster can
+// satisfy many concurrent tablet copies.
+TEST_F(TabletCopyClientSessionITest, TestTabletCopyWithBusySource) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+  const int kNumTablets = 20;
+
+  NO_FATALS(PrepareClusterForTabletCopy({ Substitute("--num_tablets_to_copy_simultaneously=$0",
+                                                     kNumTablets) },
+                                        {},
+                                        kNumTablets));
+
+  // Tune down the RPC capacity on the source server to ensure
+  // ERROR_SERVER_TOO_BUSY errors occur.
+  cluster_->tablet_server(0)->mutable_flags()->emplace_back("--rpc_service_queue_length=1");
+  cluster_->tablet_server(0)->mutable_flags()->emplace_back("--rpc_num_service_threads=1");
+
+  // Restart the TS for the new flags to take effect.
+  cluster_->tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts0, kNumTablets, kDefaultTimeout,
+                                  &tablets, tablet::TabletStatePB::RUNNING));
+  ASSERT_EQ(kNumTablets, tablets.size());
+
+  HostPort src_addr;
+  ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
+
+  vector<thread> threads;
+  for (const auto& tablet : tablets) {
+    threads.emplace_back(thread([&] {
+      const string& tablet_id = tablet.tablet_status().tablet_id();
+      // Run tablet copy.
+      CHECK_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
+                               std::numeric_limits<int64_t>::max(), kDefaultTimeout));
+      CHECK_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
+    }));
+  }
+
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index b8aa6a9..f60cc46 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -59,9 +59,11 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
-#include "kudu/util/pb_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 
 DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
              "Tablet server RPC client timeout for BeginTabletCopySession calls. "
@@ -151,8 +153,9 @@ TabletCopyClient::TabletCopyClient(std::string tablet_id,
       state_(kInitialized),
       replace_tombstoned_tablet_(false),
       tablet_replica_(nullptr),
-      session_idle_timeout_millis_(0),
+      session_idle_timeout_millis_(FLAGS_tablet_copy_begin_session_timeout_ms),
       start_time_micros_(0),
+      rng_(GetRandomSeed32()),
       tablet_copy_metrics_(tablet_copy_metrics) {
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->Increment();
@@ -239,14 +242,13 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
   req.set_tablet_id(tablet_id_);
 
   rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(
-      FLAGS_tablet_copy_begin_session_timeout_ms));
 
   // Begin the tablet copy session with the remote peer.
   BeginTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Unable to begin tablet copy session");
+  RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+    return proxy_->BeginTabletCopySession(req, &resp, &controller);
+  }), "unable to begin tablet copy session");
+
   string copy_peer_uuid = resp.has_responder_uuid()
       ? resp.responder_uuid() : "(unknown uuid)";
   if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
@@ -456,16 +458,15 @@ Status TabletCopyClient::EndRemoteSession() {
     return Status::OK();
   }
 
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_begin_session_timeout_ms));
-
   EndTabletCopySessionRequestPB req;
   req.set_session_id(session_id_);
   req.set_is_success(true);
   EndTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Failure ending tablet copy session");
+
+  rpc::RpcController controller;
+  RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+    return proxy_->EndTabletCopySession(req, &resp, &controller);
+  }), "failure ending tablet copy session");
 
   return Status::OK();
 }
@@ -669,19 +670,19 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
   rpc::RpcController controller;
   controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
   FetchDataRequestPB req;
+  req.set_session_id(session_id_);
+  req.mutable_data_id()->CopyFrom(data_id);
+  req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
 
   bool done = false;
   while (!done) {
-    controller.Reset();
-    req.set_session_id(session_id_);
-    req.mutable_data_id()->CopyFrom(data_id);
     req.set_offset(offset);
-    req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
 
+    // Request the next data chunk.
     FetchDataResponsePB resp;
-    RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
-                                controller,
-                                "Unable to fetch data from remote");
+    RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+          return proxy_->FetchData(req, &resp, &controller);
+    }), "unable to fetch data from remote");
 
     // Sanity-check for corruption.
     RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
@@ -697,12 +698,11 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
       SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_download_file_inject_latency_ms));
     }
 
-    if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
-      done = true;
-    }
-    offset += resp.chunk().data().size();
+    auto chunk_size = resp.chunk().data().size();
+    done = offset + chunk_size == resp.chunk().total_data_length();
+    offset += chunk_size;
     if (tablet_copy_metrics_) {
-      tablet_copy_metrics_->bytes_fetched->IncrementBy(resp.chunk().data().size());
+      tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size);
     }
   }
 
@@ -716,6 +716,12 @@ Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
         Substitute("$0 vs $1", offset, chunk.offset()));
   }
 
+  // Verify that the chunk does not overflow the total data length.
+  if (offset + chunk.data().length() > chunk.total_data_length()) {
+    return Status::InvalidArgument("Chunk exceeds total block data length",
+        Substitute("$0 vs $1", offset + chunk.data().length(), chunk.total_data_length()));
+  }
+
   // Verify the checksum.
   uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
   if (PREDICT_FALSE(crc32 != chunk.crc32())) {
@@ -731,5 +737,34 @@ string TabletCopyClient::LogPrefix() {
                     tablet_id_, fs_manager_->uuid());
 }
 
+template<typename F>
+Status TabletCopyClient::SendRpcWithRetry(rpc::RpcController* controller, F f) {
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(session_idle_timeout_millis_);
+  for (int attempt = 1;; attempt++) {
+    controller->Reset();
+    controller->set_deadline(deadline);
+    Status s = UnwindRemoteError(f(), *controller);
+
+    // Retry after a backoff period if the error is retriable.
+    const rpc::ErrorStatusPB* err = controller->error_response();
+    if (!s.ok() && err && (err->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+                           err->code() == rpc::ErrorStatusPB::ERROR_UNAVAILABLE)) {
+
+      // Polynomial backoff with 50% jitter.
+      double kJitterPct = 0.5;
+      int32_t kBackoffBaseMs = 10;
+      MonoDelta backoff = MonoDelta::FromMilliseconds(
+          (1 - kJitterPct + (kJitterPct * rng_.NextDoubleFraction()))
+          * kBackoffBaseMs * attempt * attempt);
+      if (MonoTime::Now() + backoff > deadline) {
+        return Status::TimedOut("unable to fetch data from remote");
+      }
+      SleepFor(backoff);
+      continue;
+    }
+    return s;
+  }
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index bdbea1e..68a3cc4 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -29,6 +29,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -204,6 +205,11 @@ class TabletCopyClient {
 
   Status VerifyData(uint64_t offset, const DataChunkPB& resp);
 
+  // Runs the provided functor, which must send an RPC and return the result
+  // status, until it succeeds, times out, or fails with a non-retriable error.
+  template<typename F>
+  Status SendRpcWithRetry(rpc::RpcController* controller, F f);
+
   // Return standard log prefix.
   std::string LogPrefix();
 
@@ -237,6 +243,8 @@ class TabletCopyClient {
   std::vector<uint64_t> wal_seqnos_;
   int64_t start_time_micros_;
 
+  Random rng_;
+
   TabletCopyClientMetrics* tablet_copy_metrics_;
 
   // Block transaction for the tablet copy.