You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/09/14 19:51:06 UTC

[1/2] kudu git commit: KUDU-2124. Don't hold session lock while initializing a TabletCopySession

Repository: kudu
Updated Branches:
  refs/heads/master 0ed75067c -> e2c0e1dbd


KUDU-2124. Don't hold session lock while initializing a TabletCopySession

This patch replaces the interior mutex in TabletCopySourceSession with a
dynamic once, so that initialization can happen concurrently. This
allows initialization to happen outside of the critical section in the
tablet copy service.

Implemented a regression test that injects latency into
BeginTabletCopySession() calls.

Change-Id: If8bd295a59ade8c89fdf1853dd64c4bceca8da91
Reviewed-on: http://gerrit.cloudera.org:8080/7985
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/449b5f59
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/449b5f59
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/449b5f59

Branch: refs/heads/master
Commit: 449b5f59c5e5c19f7e9136c5bbf76200a90f23c4
Parents: 0ed7506
Author: Mike Percy <mp...@apache.org>
Authored: Wed Aug 30 22:45:08 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Sep 14 19:50:39 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/cluster_itest_util.cc     | 31 +++++++-
 src/kudu/integration-tests/cluster_itest_util.h | 10 +++
 src/kudu/integration-tests/tablet_copy-itest.cc | 79 ++++++++++++++++++++
 src/kudu/tserver/tablet_copy_service-test.cc    | 20 +++--
 src/kudu/tserver/tablet_copy_service.cc         | 45 +++++++++--
 src/kudu/tserver/tablet_copy_source_session.cc  | 46 +++++++-----
 src/kudu/tserver/tablet_copy_source_session.h   | 43 ++++++-----
 src/kudu/tserver/tablet_server-test-base.h      | 17 +++--
 src/kudu/tserver/tablet_server-test.cc          |  3 +-
 src/kudu/tserver/tablet_server_test_util.cc     |  7 +-
 src/kudu/tserver/tablet_server_test_util.h      |  4 +-
 11 files changed, 243 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 7d6f55a..ffe978a 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -41,7 +41,9 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/tablet/tablet.pb.h"
+#include "kudu/tserver/tablet_copy.proxy.h"
 #include "kudu/tserver/tablet_server_test_util.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
@@ -95,9 +97,10 @@ using tserver::CreateTsClientProxies;
 using tserver::ListTabletsResponsePB;
 using tserver::DeleteTabletRequestPB;
 using tserver::DeleteTabletResponsePB;
-using tserver::TabletServerAdminServiceProxy;
+using tserver::BeginTabletCopySessionRequestPB;
+using tserver::BeginTabletCopySessionResponsePB;
+using tserver::TabletCopyErrorPB;
 using tserver::TabletServerErrorPB;
-using tserver::TabletServerServiceProxy;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 
@@ -296,6 +299,7 @@ Status CreateTabletServerMap(const shared_ptr<MasterServiceProxy>& master_proxy,
 
     CreateTsClientProxies(addresses[0],
                           messenger,
+                          &peer->tablet_copy_proxy,
                           &peer->tserver_proxy,
                           &peer->tserver_admin_proxy,
                           &peer->consensus_proxy,
@@ -995,5 +999,28 @@ Status StartTabletCopy(const TServerDetails* ts,
   return Status::OK();
 }
 
+Status BeginTabletCopySession(const TServerDetails* ts,
+                              const string& tablet_id,
+                              const string& caller_uuid,
+                              const MonoDelta& timeout,
+                              TabletCopyErrorPB::Code* error_code) {
+  BeginTabletCopySessionRequestPB req;
+  BeginTabletCopySessionResponsePB resp;
+  req.set_tablet_id(tablet_id);
+  req.set_requestor_uuid(caller_uuid);
+
+  RpcController rpc;
+  rpc.set_timeout(timeout);
+
+  RETURN_NOT_OK(ts->tablet_copy_proxy->BeginTabletCopySession(req, &resp, &rpc));
+  if (rpc.error_response()) {
+    const TabletCopyErrorPB& error =
+      rpc.error_response()->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
+    if (error_code) *error_code = error.code();
+    return StatusFromPB(error.status());
+  }
+  return Status::OK();
+}
+
 } // namespace itest
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 49bf0ff..f5e7d27 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -40,6 +40,8 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_copy.pb.h"
+#include "kudu/tserver/tablet_copy.proxy.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
@@ -78,6 +80,7 @@ namespace itest {
 struct TServerDetails {
   NodeInstancePB instance_id;
   ServerRegistrationPB registration;
+  gscoped_ptr<tserver::TabletCopyServiceProxy> tablet_copy_proxy;
   gscoped_ptr<tserver::TabletServerServiceProxy> tserver_proxy;
   gscoped_ptr<tserver::TabletServerAdminServiceProxy> tserver_admin_proxy;
   gscoped_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
@@ -373,6 +376,13 @@ Status StartTabletCopy(const TServerDetails* ts,
                        const MonoDelta& timeout,
                        tserver::TabletServerErrorPB::Code* error_code = nullptr);
 
+// Begin a tablet copy session on the remote.
+Status BeginTabletCopySession(const TServerDetails* ts,
+                              const std::string& tablet_id,
+                              const std::string& caller_uuid,
+                              const MonoDelta& timeout,
+                              tserver::TabletCopyErrorPB::Code* error_code = nullptr);
+
 } // namespace itest
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 5612a2e..03accae 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -19,6 +19,7 @@
 #include <cstdint>
 #include <limits>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <set>
 #include <string>
@@ -113,8 +114,12 @@ using kudu::tablet::TabletSuperBlockPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::ListTabletsResponsePB_StatusAndSchemaPB;
 using kudu::tserver::TabletCopyClient;
+using std::atomic;
+using std::lock_guard;
+using std::mutex;
 using std::set;
 using std::string;
+using std::thread;
 using std::unordered_map;
 using std::vector;
 using strings::Substitute;
@@ -1684,4 +1689,78 @@ TEST_F(TabletCopyITest, TestTabletStateMetricsDuringTabletCopy) {
   });
 }
 
+// Test that tablet copy session initialization can handle concurrency when
+// there is latency during session initialization. This is a regression test
+// for KUDU-2124.
+TEST_F(TabletCopyITest, TestBeginTabletCopySessionConcurrency) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  MonoDelta kInjectedLatency = MonoDelta::FromMilliseconds(1000);
+
+  // Inject latency during session initialization. This should show up in
+  // TabletCopyService::BeginTabletCopySession() RPC calls.
+  NO_FATALS(StartCluster({Substitute("--tablet_copy_session_inject_latency_on_init_ms=$0",
+                                     kInjectedLatency.ToMilliseconds())},
+                         {}, /*num_tablet_servers=*/ 1));
+
+  // Create a bunch of tablets to operate on at once.
+  const int kNumTablets = 10;
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.set_num_tablets(kNumTablets);
+  workload.Setup();
+
+  auto ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+  vector<string> tablet_ids;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(itest::ListRunningTabletIds(ts, kTimeout, &tablet_ids));
+    ASSERT_EQ(kNumTablets, tablet_ids.size());
+  });
+
+  // Spin up a bunch of threads simultaneously trying to begin tablet copy
+  // sessions on all of the tablets.
+  const int kNumThreads = kNumTablets * 5;
+  mutex m;
+  vector<MonoDelta> success_latencies; // Latency of successful opens.
+  vector<MonoDelta> failure_latencies; // Latency of failed opens.
+  vector<thread> threads;
+  for (int i = 0; i < kNumThreads; i++) {
+    string tablet_id = tablet_ids[i % kNumTablets];
+    threads.emplace_back([&, ts, tablet_id] {
+      while (true) {
+        MonoTime start = MonoTime::Now();
+        Status s = itest::BeginTabletCopySession(ts, tablet_id, "dummy-uuid", kTimeout);
+        MonoDelta duration = MonoTime::Now() - start;
+        lock_guard<mutex> l(m);
+        if (s.ok()) {
+          success_latencies.push_back(duration);
+          return;
+        }
+        failure_latencies.push_back(duration);
+        VLOG(1) << tablet_id << ": " << s.ToString();
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // All of the successfully initialized tablet sessions should have taken at
+  // least the amount of time that we slept for.
+  int num_slower_than_latency = 0;
+  for (const auto& lat : success_latencies) {
+    if (lat >= kInjectedLatency) num_slower_than_latency++;
+  }
+  // We should have had exactly # tablets sessions that were slow due to the
+  // latency injection.
+  ASSERT_EQ(kNumTablets, num_slower_than_latency);
+
+  // All of the failed tablet session initialization sessions should have been
+  // relatively quicker than our injected latency, demonstrating that they did
+  // not wait on the session lock.
+  for (const auto& lat : failure_latencies) {
+    ASSERT_LT(lat, kInjectedLatency);
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_copy_service-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc
index 0ff01ae..6149e6d 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 #include "kudu/tserver/tablet_copy-test-base.h"
 
+#include <atomic>
 #include <cstdint>
 #include <limits>
 #include <memory>
@@ -74,6 +75,7 @@ using pb_util::SecureDebugString;
 using pb_util::SecureShortDebugString;
 using rpc::ErrorStatusPB;
 using rpc::RpcController;
+using std::atomic;
 using std::thread;
 using std::vector;
 
@@ -250,17 +252,25 @@ TEST_F(TabletCopyServiceTest, TestBeginConcurrently) {
   const int kNumThreads = 5;
   vector<thread> threads;
   vector<tablet::TabletSuperBlockPB> sblocks(kNumThreads);
+  atomic<int> num_successful(0);
   for (int i = 0 ; i < kNumThreads; i++) {
-    threads.emplace_back([this, &sblocks, i]{
+    threads.emplace_back([this, &num_successful, &sblocks, i] {
+      while (true) {
         string session_id;
-        CHECK_OK(DoBeginValidTabletCopySession(&session_id, &sblocks[i]));
-        CHECK(!session_id.empty());
-      });
+        Status s = DoBeginValidTabletCopySession(&session_id, &sblocks[i]);
+        if (s.ok()) {
+          ++num_successful;
+          CHECK(!session_id.empty());
+          return;
+        }
+      }
+    });
   }
   for (auto& t : threads) {
     t.join();
   }
-  // Verify that all threads got the same result.
+  // Verify that all threads eventually got the same result.
+  ASSERT_EQ(kNumThreads, num_successful);
   for (int i = 1; i < threads.size(); i++) {
     ASSERT_EQ(SecureDebugString(sblocks[i]), SecureDebugString(sblocks[0]));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_copy_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index cbbf8da..6cfb9e7 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -144,10 +144,12 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
                     context);
 
   scoped_refptr<TabletCopySourceSession> session;
+  bool new_session;
   {
     MutexLock l(sessions_lock_);
     const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
-    if (!session_entry) {
+    new_session = session_entry == nullptr;
+    if (new_session) {
       LOG_WITH_PREFIX(INFO) << Substitute(
           "Beginning new tablet copy session on tablet $0 from peer $1"
           " at $2: session id = $3",
@@ -155,10 +157,6 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
       session.reset(new TabletCopySourceSession(tablet_replica, session_id,
                                                 requestor_uuid, fs_manager_,
                                                 &tablet_copy_metrics_));
-      RPC_RETURN_NOT_OK(session->Init(),
-                        TabletCopyErrorPB::UNKNOWN_ERROR,
-                        Substitute("Error beginning tablet copy session for tablet $0", tablet_id),
-                        context);
       InsertOrDie(&sessions_, session_id, { session, GetNewExpireTime() });
     } else {
       session = session_entry->session;
@@ -170,6 +168,32 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
     ResetSessionExpirationUnlocked(session_id);
   }
 
+  if (!new_session && !session->IsInitialized()) {
+    RPC_RETURN_NOT_OK(
+        Status::ServiceUnavailable("tablet copy session for tablet $0 is initializing",
+                                   tablet_id),
+        TabletCopyErrorPB::UNKNOWN_ERROR,
+        "try again later",
+        context);
+  }
+
+  // Ensure that the session initialization succeeds. If the initialization
+  // fails, then remove it from the sessions map.
+  Status status = session->Init();
+  if (!status.ok()) {
+    MutexLock l(sessions_lock_);
+    SessionEntry* session_entry = FindOrNull(sessions_, session_id);
+    // Identity check the session to ensure that other interleaved threads have
+    // not already removed the failed session, and replaced it with a new one.
+    if (session_entry && session == session_entry->session) {
+      sessions_.erase(session_id);
+    }
+  }
+  RPC_RETURN_NOT_OK(status,
+                    TabletCopyErrorPB::UNKNOWN_ERROR,
+                    Substitute("Error beginning tablet copy session for tablet $0", tablet_id),
+                    context);
+
   resp->set_responder_uuid(fs_manager_->uuid());
   resp->set_session_id(session_id);
   resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_sec * 1000);
@@ -240,6 +264,15 @@ void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
     ResetSessionExpirationUnlocked(session_id);
   }
 
+  if (!session->IsInitialized()) {
+    RPC_RETURN_NOT_OK(
+        Status::ServiceUnavailable("tablet copy session for tablet $0 is initializing",
+                                   session->tablet_id()),
+        TabletCopyErrorPB::UNKNOWN_ERROR,
+        "try again later",
+        context);
+  }
+
   MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
 
   uint64_t offset = req->offset();
@@ -281,7 +314,7 @@ void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
 
 void TabletCopyServiceImpl::EndTabletCopySession(
         const EndTabletCopySessionRequestPB* req,
-        EndTabletCopySessionResponsePB* resp,
+        EndTabletCopySessionResponsePB* /*resp*/,
         rpc::RpcContext* context) {
   {
     MutexLock l(sessions_lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index abd1c66..62ba827 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -45,7 +45,7 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/mutex.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/stopwatch.h"
@@ -66,6 +66,12 @@ METRIC_DEFINE_gauge_int32(server, tablet_copy_open_source_sessions,
                           kudu::MetricUnit::kSessions,
                           "Number of currently open tablet copy source sessions on this server");
 
+DEFINE_int32(tablet_copy_session_inject_latency_on_init_ms, 0,
+             "How much latency (in ms) to inject when a tablet copy session is initialized. "
+             "(For testing only!)");
+TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, unsafe);
+TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, hidden);
+
 namespace kudu {
 namespace tserver {
 
@@ -111,8 +117,16 @@ TabletCopySourceSession::~TabletCopySourceSession() {
 }
 
 Status TabletCopySourceSession::Init() {
-  MutexLock l(session_lock_);
-  CHECK(!initted_);
+  return init_once_.Init(&TabletCopySourceSession::InitOnce, this);
+}
+
+Status TabletCopySourceSession::InitOnce() {
+  // Inject latency during Init() for testing purposes.
+  if (PREDICT_FALSE(FLAGS_tablet_copy_session_inject_latency_on_init_ms > 0)) {
+    TRACE("Injecting $0ms of latency due to --tablet_copy_session_inject_latency_on_init_ms",
+          FLAGS_tablet_copy_session_inject_latency_on_init_ms);
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_session_inject_latency_on_init_ms));
+  }
 
   RETURN_NOT_OK(tablet_replica_->CheckRunning());
 
@@ -136,7 +150,7 @@ Status TabletCopySourceSession::Init() {
       TabletMetadata::CollectBlockIdPBs(tablet_superblock_);
   for (const BlockIdPB& block_id : data_blocks) {
     VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id);
-    RETURN_NOT_OK(OpenBlockUnlocked(BlockId::FromPB(block_id)));
+    RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id)));
   }
 
   // Get the latest opid in the log at this point in time so we can re-anchor.
@@ -158,7 +172,7 @@ Status TabletCopySourceSession::Init() {
   }
   reader->GetSegmentsSnapshot(&log_segments_);
   for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
-    RETURN_NOT_OK(OpenLogSegmentUnlocked(segment->header().sequence_number()));
+    RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number()));
   }
 
   // Look up the committed consensus state.
@@ -186,17 +200,14 @@ Status TabletCopySourceSession::Init() {
   LOG(INFO) << Substitute(
       "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
       tablet_id, consensus->peer_uuid(), data_blocks.size(), log_segments_.size());
-  initted_ = true;
   return Status::OK();
 }
 
 const std::string& TabletCopySourceSession::tablet_id() const {
-  DCHECK(initted_);
   return tablet_replica_->tablet_id();
 }
 
 const std::string& TabletCopySourceSession::requestor_uuid() const {
-  DCHECK(initted_);
   return requestor_uuid_;
 }
 
@@ -288,6 +299,7 @@ Status TabletCopySourceSession::GetBlockPiece(const BlockId& block_id,
                                              uint64_t offset, int64_t client_maxlen,
                                              string* data, int64_t* block_file_size,
                                              TabletCopyErrorPB::Code* error_code) {
+  DCHECK(init_once_.initted());
   ImmutableReadableBlockInfo* block_info;
   RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));
 
@@ -307,6 +319,7 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                    uint64_t offset, int64_t client_maxlen,
                                                    std::string* data, int64_t* log_file_size,
                                                    TabletCopyErrorPB::Code* error_code) {
+  DCHECK(init_once_.initted());
   ImmutableRandomAccessFileInfo* file_info;
   RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
   RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
@@ -319,7 +332,7 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
 }
 
 bool TabletCopySourceSession::IsBlockOpenForTests(const BlockId& block_id) const {
-  MutexLock l(session_lock_);
+  DCHECK(init_once_.initted());
   return ContainsKey(blocks_, block_id);
 }
 
@@ -343,9 +356,7 @@ static Status AddImmutableFileToMap(Collection* const cache,
   return Status::OK();
 }
 
-Status TabletCopySourceSession::OpenBlockUnlocked(const BlockId& block_id) {
-  session_lock_.AssertAcquired();
-
+Status TabletCopySourceSession::OpenBlock(const BlockId& block_id) {
   unique_ptr<ReadableBlock> block;
   Status s = fs_manager_->OpenBlock(block_id, &block);
   if (PREDICT_FALSE(!s.ok())) {
@@ -374,18 +385,14 @@ Status TabletCopySourceSession::OpenBlockUnlocked(const BlockId& block_id) {
 Status TabletCopySourceSession::FindBlock(const BlockId& block_id,
                                          ImmutableReadableBlockInfo** block_info,
                                          TabletCopyErrorPB::Code* error_code) {
-  Status s;
-  MutexLock l(session_lock_);
   if (!FindCopy(blocks_, block_id, block_info)) {
     *error_code = TabletCopyErrorPB::BLOCK_NOT_FOUND;
-    s = Status::NotFound("Block not found", block_id.ToString());
+    return Status::NotFound("Block not found", block_id.ToString());
   }
-  return s;
+  return Status::OK();
 }
 
-Status TabletCopySourceSession::OpenLogSegmentUnlocked(uint64_t segment_seqno) {
-  session_lock_.AssertAcquired();
-
+Status TabletCopySourceSession::OpenLogSegment(uint64_t segment_seqno) {
   scoped_refptr<log::ReadableLogSegment> log_segment;
   int position = -1;
   if (!log_segments_.empty()) {
@@ -412,7 +419,6 @@ Status TabletCopySourceSession::OpenLogSegmentUnlocked(uint64_t segment_seqno) {
 Status TabletCopySourceSession::FindLogSegment(uint64_t segment_seqno,
                                               ImmutableRandomAccessFileInfo** file_info,
                                               TabletCopyErrorPB::Code* error_code) {
-  MutexLock l(session_lock_);
   if (!FindCopy(logs_, segment_seqno, file_info)) {
     *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND;
     return Status::NotFound(Substitute("Segment with sequence number $0 not found",

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_copy_source_session.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.h b/src/kudu/tserver/tablet_copy_source_session.h
index b0f2b3a..fbd58fd 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -30,15 +30,15 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tserver/tablet_copy.pb.h"
 #include "kudu/util/env.h"
-#include "kudu/gutil/integral_types.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/mutex.h"
+#include "kudu/util/once.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -107,9 +107,14 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
   // Initialize the session, including anchoring files (TODO) and fetching the
   // tablet superblock and list of WAL segments.
   //
-  // Must not be called more than once.
+  // Must be called before accessing block state.
   Status Init();
 
+  // Returns true if this session has been initialized.
+  bool IsInitialized() const {
+    return init_once_.initted();
+  }
+
   // Return ID of tablet corresponding to this session.
   const std::string& tablet_id() const;
 
@@ -138,17 +143,17 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
                             TabletCopyErrorPB::Code* error_code);
 
   const tablet::TabletSuperBlockPB& tablet_superblock() const {
-    DCHECK(initted_);
+    DCHECK(init_once_.initted());
     return tablet_superblock_;
   }
 
   const consensus::ConsensusStatePB& initial_cstate() const {
-    DCHECK(initted_);
+    DCHECK(init_once_.initted());
     return initial_cstate_;
   }
 
   const log::SegmentSequence& log_segments() const {
-    DCHECK(initted_);
+    DCHECK(init_once_.initted());
     return log_segments_;
   }
 
@@ -167,8 +172,11 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
 
   ~TabletCopySourceSession();
 
+  // Internal helper method for Init().
+  Status InitOnce();
+
   // Open the block and add it to the block map.
-  Status OpenBlockUnlocked(const BlockId& block_id);
+  Status OpenBlock(const BlockId& block_id);
 
   // Look up cached block information.
   Status FindBlock(const BlockId& block_id,
@@ -176,7 +184,7 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
                    TabletCopyErrorPB::Code* error_code);
 
   // Snapshot the log segment's length and put it into segment map.
-  Status OpenLogSegmentUnlocked(uint64_t segment_seqno);
+  Status OpenLogSegment(uint64_t segment_seqno);
 
   // Look up log segment in cache or log segment map.
   Status FindLogSegment(uint64_t segment_seqno,
@@ -186,26 +194,23 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
   // Unregister log anchor, if it's registered.
   Status UnregisterAnchorIfNeededUnlocked();
 
-  scoped_refptr<tablet::TabletReplica> tablet_replica_;
+  const scoped_refptr<tablet::TabletReplica> tablet_replica_;
   const std::string session_id_;
   const std::string requestor_uuid_;
   FsManager* const fs_manager_;
 
-  mutable Mutex session_lock_;
-  bool initted_ = false;
-  BlockMap blocks_; // Protected by session_lock_.
-  LogMap logs_;     // Protected by session_lock_.
+  // Protects concurrent access to Init().
+  KuduOnceDynamic init_once_;
+
+  // The following fields are initialized during Init():
+  BlockMap blocks_;
+  LogMap logs_;
   ValueDeleter blocks_deleter_;
   ValueDeleter logs_deleter_;
-
   tablet::TabletSuperBlockPB tablet_superblock_;
-
   consensus::ConsensusStatePB initial_cstate_;
-
-  // The sequence of log segments that will be sent in the course of this
-  // session.
+  // The sequence of log segments that will be sent in the course of this session.
   log::SegmentSequence log_segments_;
-
   log::LogAnchor log_anchor_;
 
   TabletCopySourceMetrics* tablet_copy_metrics_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index d4bff34..ec015dc 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -18,23 +18,26 @@
 #ifndef KUDU_TSERVER_TABLET_SERVER_TEST_BASE_H_
 #define KUDU_TSERVER_TABLET_SERVER_TEST_BASE_H_
 
-#include <algorithm>
 #include <assert.h>
-#include <gtest/gtest.h>
-#include <iostream>
-#include <memory>
 #include <signal.h>
 #include <stdint.h>
-#include <string>
 #include <sys/mman.h>
 #include <sys/types.h>
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
+#include <gtest/gtest.h>
+
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
@@ -160,7 +163,8 @@ class TabletServerTestBase : public KuduTest {
   void ResetClientProxies() {
     CreateTsClientProxies(mini_server_->bound_rpc_addr(),
                           client_messenger_,
-                          &proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);
+                          &tablet_copy_proxy_, &proxy_, &admin_proxy_, &consensus_proxy_,
+                          &generic_proxy_);
   }
 
   // Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
@@ -465,6 +469,7 @@ class TabletServerTestBase : public KuduTest {
 
   gscoped_ptr<MiniTabletServer> mini_server_;
   scoped_refptr<tablet::TabletReplica> tablet_replica_;
+  gscoped_ptr<TabletCopyServiceProxy> tablet_copy_proxy_;
   gscoped_ptr<TabletServerServiceProxy> proxy_;
   gscoped_ptr<TabletServerAdminServiceProxy> admin_proxy_;
   gscoped_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index d254e40..47112eb 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -1130,7 +1130,8 @@ TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) {
   // Connect to it.
   CreateTsClientProxies(mini_server_->bound_rpc_addr(),
                         client_messenger_,
-                        &proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);
+                        &tablet_copy_proxy_, &proxy_, &admin_proxy_, &consensus_proxy_,
+                        &generic_proxy_);
 
   WriteRequestPB req;
   req.set_tablet_id(kTabletId);

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_server_test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server_test_util.cc b/src/kudu/tserver/tablet_server_test_util.cc
index f849549..7ac3bec 100644
--- a/src/kudu/tserver/tablet_server_test_util.cc
+++ b/src/kudu/tserver/tablet_server_test_util.cc
@@ -19,6 +19,7 @@
 
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/server/server_base.proxy.h"
+#include "kudu/tserver/tablet_copy.proxy.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/net/sockaddr.h"
@@ -32,12 +33,14 @@ using std::shared_ptr;
 
 void CreateTsClientProxies(const Sockaddr& addr,
                            const shared_ptr<Messenger>& messenger,
-                           gscoped_ptr<TabletServerServiceProxy>* proxy,
+                           gscoped_ptr<TabletCopyServiceProxy>* tablet_copy_proxy,
+                           gscoped_ptr<TabletServerServiceProxy>* tablet_server_proxy,
                            gscoped_ptr<TabletServerAdminServiceProxy>* admin_proxy,
                            gscoped_ptr<ConsensusServiceProxy>* consensus_proxy,
                            gscoped_ptr<server::GenericServiceProxy>* generic_proxy) {
   const auto& host = addr.host();
-  proxy->reset(new TabletServerServiceProxy(messenger, addr, host));
+  tablet_copy_proxy->reset(new TabletCopyServiceProxy(messenger, addr, host));
+  tablet_server_proxy->reset(new TabletServerServiceProxy(messenger, addr, host));
   admin_proxy->reset(new TabletServerAdminServiceProxy(messenger, addr, host));
   consensus_proxy->reset(new ConsensusServiceProxy(messenger, addr, host));
   generic_proxy->reset(new server::GenericServiceProxy(messenger, addr, host));

http://git-wip-us.apache.org/repos/asf/kudu/blob/449b5f59/src/kudu/tserver/tablet_server_test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server_test_util.h b/src/kudu/tserver/tablet_server_test_util.h
index 6901c1e..104e5ce 100644
--- a/src/kudu/tserver/tablet_server_test_util.h
+++ b/src/kudu/tserver/tablet_server_test_util.h
@@ -37,13 +37,15 @@ class GenericServiceProxy;
 }
 
 namespace tserver {
+class TabletCopyServiceProxy;
 class TabletServerAdminServiceProxy;
 class TabletServerServiceProxy;
 
 // Create tablet server client proxies for tests.
 void CreateTsClientProxies(const Sockaddr& addr,
                            const std::shared_ptr<rpc::Messenger>& messenger,
-                           gscoped_ptr<TabletServerServiceProxy>* proxy,
+                           gscoped_ptr<TabletCopyServiceProxy>* tablet_copy_proxy,
+                           gscoped_ptr<TabletServerServiceProxy>* tablet_server_proxy,
                            gscoped_ptr<TabletServerAdminServiceProxy>* admin_proxy,
                            gscoped_ptr<consensus::ConsensusServiceProxy>* consensus_proxy,
                            gscoped_ptr<server::GenericServiceProxy>* generic_proxy);


[2/2] kudu git commit: KUDU-2125: Tablet copy client does not retry on failures

Posted by da...@apache.org.
KUDU-2125: Tablet copy client does not retry on failures

The tablet copy client would fail the tablet copy after encountering an
RPC error. This commit adds retry logic for tablet copy operations when
encountering SERVER_TOO_BUSY or UNAVAILABLE errors, which are retriable.

Change-Id: I7c8454fc600a841bd15306a2b3b06ddf53130be6
Reviewed-on: http://gerrit.cloudera.org:8080/8016
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e2c0e1db
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e2c0e1db
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e2c0e1db

Branch: refs/heads/master
Commit: e2c0e1dbdc4fa15a6477a82fda5495928a26a508
Parents: 449b5f5
Author: Dan Burkert <da...@apache.org>
Authored: Wed Sep 6 20:24:54 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Sep 14 19:50:43 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/cluster_itest_util.cc     | 12 ++-
 src/kudu/integration-tests/cluster_itest_util.h | 14 ++--
 .../tablet_copy_client_session-itest.cc         | 53 ++++++++++++
 src/kudu/tserver/tablet_copy_client.cc          | 85 ++++++++++++++------
 src/kudu/tserver/tablet_copy_client.h           |  8 ++
 5 files changed, 139 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index ffe978a..cb5a880 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -840,7 +840,8 @@ Status WaitForNumVotersInConfigOnMaster(const shared_ptr<MasterServiceProxy>& ma
 Status WaitForNumTabletsOnTS(TServerDetails* ts,
                              int count,
                              const MonoDelta& timeout,
-                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {
+                             vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets,
+                             boost::optional<tablet::TabletStatePB> state) {
   // If the user doesn't care about collecting the resulting tablets, collect into a local
   // vector.
   vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets_local;
@@ -850,6 +851,15 @@ Status WaitForNumTabletsOnTS(TServerDetails* ts,
   MonoTime deadline = MonoTime::Now() + timeout;
   while (true) {
     s = ListTablets(ts, MonoDelta::FromSeconds(10), tablets);
+    if (s.ok() && state) {
+      tablets->erase(
+          std::remove_if(tablets->begin(), tablets->end(),
+                         [&] (const ListTabletsResponsePB::StatusAndSchemaPB& t) {
+                           return t.tablet_status().state() != state;
+                         }),
+          tablets->end());
+    }
+
     if (s.ok() && tablets->size() == count) break;
     if (MonoTime::Now() > deadline) break;
     SleepFor(MonoDelta::FromMilliseconds(10));

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index f5e7d27..59276dd 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -33,6 +33,8 @@
 #include <unordered_map>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
@@ -46,11 +48,6 @@
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 
-namespace boost {
-template <class T>
-class optional;
-}
-
 namespace kudu {
 class HostPort;
 class MonoDelta;
@@ -320,12 +317,15 @@ Status WaitForNumVotersInConfigOnMaster(
     const MonoDelta& timeout);
 
 // Repeatedly invoke ListTablets(), waiting for up to 'timeout' time for the
-// specified 'count' number of replicas.
+// specified 'count' number of replicas. If 'state' is provided, the replicas
+// must also be in the specified state for the wait to be considered
+// successful.
 Status WaitForNumTabletsOnTS(
     TServerDetails* ts,
     int count,
     const MonoDelta& timeout,
-    std::vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB>* tablets);
+    std::vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB>* tablets,
+    boost::optional<tablet::TabletStatePB> state = boost::none);
 
 // Check if the tablet is in the specified state.
 Status CheckIfTabletInState(TServerDetails* ts,

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
index a4a1a03..0625905 100644
--- a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <vector>
 
@@ -31,6 +32,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/external_mini_cluster.h"
@@ -52,7 +54,9 @@ using kudu::itest::WaitUntilTabletRunning;
 using kudu::tablet::TabletDataState;
 using kudu::tserver::ListTabletsResponsePB;
 using std::string;
+using std::thread;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 
@@ -265,4 +269,53 @@ TEST_F(TabletCopyClientSessionITest, TestCopyFromCrashedSource) {
   ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
 }
 
+// Regression for KUDU-2125: ensure that a heavily loaded source cluster can
+// satisfy many concurrent tablet copies.
+TEST_F(TabletCopyClientSessionITest, TestTabletCopyWithBusySource) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+  const int kNumTablets = 20;
+
+  NO_FATALS(PrepareClusterForTabletCopy({ Substitute("--num_tablets_to_copy_simultaneously=$0",
+                                                     kNumTablets) },
+                                        {},
+                                        kNumTablets));
+
+  // Tune down the RPC capacity on the source server to ensure
+  // ERROR_SERVER_TOO_BUSY errors occur.
+  cluster_->tablet_server(0)->mutable_flags()->emplace_back("--rpc_service_queue_length=1");
+  cluster_->tablet_server(0)->mutable_flags()->emplace_back("--rpc_num_service_threads=1");
+
+  // Restart the TS for the new flags to take effect.
+  cluster_->tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts0, kNumTablets, kDefaultTimeout,
+                                  &tablets, tablet::TabletStatePB::RUNNING));
+  ASSERT_EQ(kNumTablets, tablets.size());
+
+  HostPort src_addr;
+  ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
+
+  vector<thread> threads;
+  for (const auto& tablet : tablets) {
+    threads.emplace_back(thread([&] {
+      const string& tablet_id = tablet.tablet_status().tablet_id();
+      // Run tablet copy.
+      CHECK_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
+                               std::numeric_limits<int64_t>::max(), kDefaultTimeout));
+      CHECK_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
+    }));
+  }
+
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index b8aa6a9..f60cc46 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -59,9 +59,11 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
-#include "kudu/util/pb_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 
 DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
              "Tablet server RPC client timeout for BeginTabletCopySession calls. "
@@ -151,8 +153,9 @@ TabletCopyClient::TabletCopyClient(std::string tablet_id,
       state_(kInitialized),
       replace_tombstoned_tablet_(false),
       tablet_replica_(nullptr),
-      session_idle_timeout_millis_(0),
+      session_idle_timeout_millis_(FLAGS_tablet_copy_begin_session_timeout_ms),
       start_time_micros_(0),
+      rng_(GetRandomSeed32()),
       tablet_copy_metrics_(tablet_copy_metrics) {
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->Increment();
@@ -239,14 +242,13 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
   req.set_tablet_id(tablet_id_);
 
   rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(
-      FLAGS_tablet_copy_begin_session_timeout_ms));
 
   // Begin the tablet copy session with the remote peer.
   BeginTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Unable to begin tablet copy session");
+  RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+    return proxy_->BeginTabletCopySession(req, &resp, &controller);
+  }), "unable to begin tablet copy session");
+
   string copy_peer_uuid = resp.has_responder_uuid()
       ? resp.responder_uuid() : "(unknown uuid)";
   if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
@@ -456,16 +458,15 @@ Status TabletCopyClient::EndRemoteSession() {
     return Status::OK();
   }
 
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_begin_session_timeout_ms));
-
   EndTabletCopySessionRequestPB req;
   req.set_session_id(session_id_);
   req.set_is_success(true);
   EndTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Failure ending tablet copy session");
+
+  rpc::RpcController controller;
+  RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+    return proxy_->EndTabletCopySession(req, &resp, &controller);
+  }), "failure ending tablet copy session");
 
   return Status::OK();
 }
@@ -669,19 +670,19 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
   rpc::RpcController controller;
   controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
   FetchDataRequestPB req;
+  req.set_session_id(session_id_);
+  req.mutable_data_id()->CopyFrom(data_id);
+  req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
 
   bool done = false;
   while (!done) {
-    controller.Reset();
-    req.set_session_id(session_id_);
-    req.mutable_data_id()->CopyFrom(data_id);
     req.set_offset(offset);
-    req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
 
+    // Request the next data chunk.
     FetchDataResponsePB resp;
-    RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
-                                controller,
-                                "Unable to fetch data from remote");
+    RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+          return proxy_->FetchData(req, &resp, &controller);
+    }), "unable to fetch data from remote");
 
     // Sanity-check for corruption.
     RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
@@ -697,12 +698,11 @@ Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
       SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_download_file_inject_latency_ms));
     }
 
-    if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
-      done = true;
-    }
-    offset += resp.chunk().data().size();
+    auto chunk_size = resp.chunk().data().size();
+    done = offset + chunk_size == resp.chunk().total_data_length();
+    offset += chunk_size;
     if (tablet_copy_metrics_) {
-      tablet_copy_metrics_->bytes_fetched->IncrementBy(resp.chunk().data().size());
+      tablet_copy_metrics_->bytes_fetched->IncrementBy(chunk_size);
     }
   }
 
@@ -716,6 +716,12 @@ Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
         Substitute("$0 vs $1", offset, chunk.offset()));
   }
 
+  // Verify that the chunk does not overflow the total data length.
+  if (offset + chunk.data().length() > chunk.total_data_length()) {
+    return Status::InvalidArgument("Chunk exceeds total block data length",
+        Substitute("$0 vs $1", offset + chunk.data().length(), chunk.total_data_length()));
+  }
+
   // Verify the checksum.
   uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
   if (PREDICT_FALSE(crc32 != chunk.crc32())) {
@@ -731,5 +737,34 @@ string TabletCopyClient::LogPrefix() {
                     tablet_id_, fs_manager_->uuid());
 }
 
+template<typename F>
+Status TabletCopyClient::SendRpcWithRetry(rpc::RpcController* controller, F f) {
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(session_idle_timeout_millis_);
+  for (int attempt = 1;; attempt++) {
+    controller->Reset();
+    controller->set_deadline(deadline);
+    Status s = UnwindRemoteError(f(), *controller);
+
+    // Retry after a backoff period if the error is retriable.
+    const rpc::ErrorStatusPB* err = controller->error_response();
+    if (!s.ok() && err && (err->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY ||
+                           err->code() == rpc::ErrorStatusPB::ERROR_UNAVAILABLE)) {
+
+      // Polynomial backoff with 50% jitter.
+      double kJitterPct = 0.5;
+      int32_t kBackoffBaseMs = 10;
+      MonoDelta backoff = MonoDelta::FromMilliseconds(
+          (1 - kJitterPct + (kJitterPct * rng_.NextDoubleFraction()))
+          * kBackoffBaseMs * attempt * attempt);
+      if (MonoTime::Now() + backoff > deadline) {
+        return Status::TimedOut("unable to fetch data from remote");
+      }
+      SleepFor(backoff);
+      continue;
+    }
+    return s;
+  }
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e2c0e1db/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index bdbea1e..68a3cc4 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -29,6 +29,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -204,6 +205,11 @@ class TabletCopyClient {
 
   Status VerifyData(uint64_t offset, const DataChunkPB& resp);
 
+  // Runs the provided functor, which must send an RPC and return the result
+  // status, until it succeeds, times out, or fails with a non-retriable error.
+  template<typename F>
+  Status SendRpcWithRetry(rpc::RpcController* controller, F f);
+
   // Return standard log prefix.
   std::string LogPrefix();
 
@@ -237,6 +243,8 @@ class TabletCopyClient {
   std::vector<uint64_t> wal_seqnos_;
   int64_t start_time_micros_;
 
+  Random rng_;
+
   TabletCopyClientMetrics* tablet_copy_metrics_;
 
   // Block transaction for the tablet copy.