You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/08 03:43:55 UTC

[3/4] kudu git commit: KUDU-921. tablet copy: Make the StartTabletCopy() RPC async

KUDU-921. tablet copy: Make the StartTabletCopy() RPC async

This patch changes tablet copy to execute on its own thread pool.
This clears up threads on the Consensus service pool for other tasks.

A new ThreadPool called tablet_copy_pool_ was added to TSTabletManager
to run TabletCopy operations. Its max_threads parameter is tunable with
a new gflag and it has its max_queue_size hard-coded to 0 in order to
provide backpressure when it doesn't have capacity to immediately copy
new tablets.

This patch changes the semantics of StartTabletCopy() to return as soon
as the tablet copy has started -- it no longer waits until the process
is completed to return. Clients can follow the progress of the tablet
copy process using the ListTablets() RPC call and waiting for the tablet
to be in a RUNNING state.

A test was added in tablet_copy-itest that checks that the backpressure
mechanism is working such that submitting too many StartTabletCopy()
requests at one time results in a ServiceUnavailable error.

Some additional tests were added in tablet_copy_client_session-itest to
improve test coverage of the StartTabletCopy() code path.

Change-Id: I95c63f2bfd67624844447862efbdba9cb3676112
Reviewed-on: http://gerrit.cloudera.org:8080/5045
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bec5f9b3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bec5f9b3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bec5f9b3

Branch: refs/heads/master
Commit: bec5f9b34fb773a230c72f0a7fe2b988078933d8
Parents: 2acf1ba
Author: Mike Percy <mp...@apache.org>
Authored: Mon Dec 5 16:10:42 2016 +0000
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 8 03:41:20 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/cluster_itest_util.h |  13 +-
 src/kudu/integration-tests/tablet_copy-itest.cc |  75 ++++++-
 .../tablet_copy_client_session-itest.cc         |  73 +++++++
 src/kudu/master/catalog_manager.cc              |   9 +-
 src/kudu/master/catalog_manager.h               |  10 +-
 src/kudu/tserver/tablet_copy_service.cc         |   4 +-
 src/kudu/tserver/tablet_peer_lookup.h           |   6 +-
 src/kudu/tserver/tablet_service.cc              |  21 +-
 src/kudu/tserver/ts_tablet_manager.cc           | 196 ++++++++++++++-----
 src/kudu/tserver/ts_tablet_manager.h            |  23 ++-
 10 files changed, 341 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 2810186..2d73661 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -302,7 +302,8 @@ Status DeleteTablet(const TServerDetails* ts,
 // Repeatedly try to delete the tablet, retrying on failure up to the
 // specified timeout. Deletion can fail when other operations, such as
 // bootstrap or tablet copy, are running.
-void DeleteTabletWithRetries(const TServerDetails* ts, const std::string& tablet_id,
+void DeleteTabletWithRetries(const TServerDetails* ts,
+                             const std::string& tablet_id,
                              tablet::TabletDataState delete_type,
                              const boost::optional<int64_t>& config_opid_index,
                              const MonoDelta& timeout);
@@ -310,11 +311,11 @@ void DeleteTabletWithRetries(const TServerDetails* ts, const std::string& tablet
 // Cause the remote to initiate tablet copy using the specified host as a
 // source.
 Status StartTabletCopy(const TServerDetails* ts,
-                            const std::string& tablet_id,
-                            const std::string& copy_source_uuid,
-                            const HostPort& copy_source_addr,
-                            int64_t caller_term,
-                            const MonoDelta& timeout);
+                       const std::string& tablet_id,
+                       const std::string& copy_source_uuid,
+                       const HostPort& copy_source_addr,
+                       int64_t caller_term,
+                       const MonoDelta& timeout);
 
 } // namespace itest
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 071f18c..497fa63 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -112,8 +112,8 @@ class TabletCopyITest : public KuduTest {
 };
 
 void TabletCopyITest::StartCluster(const vector<string>& extra_tserver_flags,
-                                        const vector<string>& extra_master_flags,
-                                        int num_tablet_servers) {
+                                   const vector<string>& extra_master_flags,
+                                   int num_tablet_servers) {
   ExternalMiniClusterOptions opts;
   opts.num_tablet_servers = num_tablet_servers;
   opts.extra_tserver_flags = extra_tserver_flags;
@@ -186,7 +186,7 @@ TEST_F(TabletCopyITest, TestRejectRogueLeader) {
                                          0, // Say I'm from term 0.
                                          timeout);
   ASSERT_TRUE(s.IsInvalidArgument());
-  ASSERT_STR_CONTAINS(s.ToString(), "term 0 lower than last logged term 1");
+  ASSERT_STR_CONTAINS(s.ToString(), "term 0, which is lower than last-logged term 1");
 
   // Now pause the actual leader so we can bring him back as a zombie later.
   ASSERT_OK(cluster_->tablet_server(zombie_leader_index)->Pause());
@@ -895,4 +895,73 @@ TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
   ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
 }
 
+// Test that the tablet copy thread pool being full results in throttling and
+// backpressure on the callers.
+TEST_F(TabletCopyITest, TestTabletCopyThrottling) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  const int kNumTablets = 4;
+  // We want 2 tablet servers and we don't want the master to interfere when we
+  // forcibly make copies of tablets onto servers it doesn't know about.
+  // We also want to make sure only one tablet copy is possible at a given time
+  // in order to test the throttling.
+  NO_FATALS(StartCluster({"--num_tablets_to_copy_simultaneously=1"},
+                         {"--master_tombstone_evicted_tablet_replicas=false"},
+                         2));
+  // Shut down the 2nd tablet server; we'll create tablets on the first one.
+  cluster_->tablet_server(1)->Shutdown();
+
+  // Restart the Master so it doesn't try to assign tablets to the dead TS.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+  ASSERT_OK(cluster_->WaitForTabletServerCount(1, kTimeout));
+
+  // Write a bunch of data to the first tablet server.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_write_threads(8);
+  workload.set_num_replicas(1);
+  workload.set_num_tablets(kNumTablets);
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 10000) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
+
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts0, kNumTablets, kTimeout, &tablets));
+
+  workload.StopAndJoin();
+
+  // Now we attempt to copy all of that data over to the 2nd tablet server.
+  // We will attempt to copy 4 tablets simultanously, but because we have tuned
+  // the number of tablet copy threads down to 1, we should get at least one
+  // ServiceUnavailable error.
+  ASSERT_OK(cluster_->tablet_server(1)->Restart());
+
+  // Attempt to copy all of the tablets from TS0 to TS1.
+  // Collect the status messages.
+  vector<Status> statuses(kNumTablets);
+  for (const auto& t : tablets) {
+    HostPort src_addr;
+    ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
+    statuses.push_back(StartTabletCopy(ts1, t.tablet_status().tablet_id(), ts0->uuid(),
+                                       src_addr, 0, kTimeout));
+  }
+
+  // The "Service unavailable" messages are serialized as RemoteError type.
+  // Ensure that we got at least one.
+  int num_service_unavailable = 0;
+  for (const Status& s : statuses) {
+    if (!s.ok()) {
+      ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "Service unavailable: Thread pool is at capacity");
+      num_service_unavailable++;
+    }
+  }
+  ASSERT_GT(num_service_unavailable, 0);
+  LOG(INFO) << "Number of Service unavailable responses: " << num_service_unavailable;
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
index 0f7fd4a..4a2935f 100644
--- a/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy_client_session-itest.cc
@@ -164,4 +164,77 @@ TEST_F(TabletCopyClientSessionITest, TestStartTabletCopyWhileSourceBootstrapping
   }
 }
 
+// Test that StartTabletCopy() works in different scenarios.
+TEST_F(TabletCopyClientSessionITest, TestStartTabletCopy) {
+  NO_FATALS(PrepareClusterForTabletCopy());
+
+  TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts0, kDefaultNumTablets, kDefaultTimeout, &tablets));
+  ASSERT_EQ(kDefaultNumTablets, tablets.size());
+  const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Scenarios to run tablet copy on top of:
+  enum Scenarios {
+    kPristine,    // No tablets.
+    kTombstoned,  // A tombstoned tablet.
+    kLast
+  };
+  for (int scenario = 0; scenario < kLast; scenario++) {
+    if (scenario == kTombstoned) {
+      NO_FATALS(DeleteTabletWithRetries(ts1, tablet_id,
+                                        TabletDataState::TABLET_DATA_TOMBSTONED,
+                                        boost::none, kDefaultTimeout));
+    }
+
+    // Run tablet copy.
+    HostPort src_addr;
+    ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
+    ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
+                              std::numeric_limits<int64_t>::max(), kDefaultTimeout));
+    ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
+  }
+}
+
+// Test that a tablet copy session will tombstone the tablet if the source
+// server crashes in the middle of the tablet copy.
+TEST_F(TabletCopyClientSessionITest, TestCopyFromCrashedSource) {
+  NO_FATALS(PrepareClusterForTabletCopy());
+
+  TServerDetails* ts0 = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1 = ts_map_[cluster_->tablet_server(1)->uuid()];
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(ts0, kDefaultNumTablets, kDefaultTimeout, &tablets));
+  ASSERT_EQ(kDefaultNumTablets, tablets.size());
+  const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Crash when serving tablet copy.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "fault_crash_on_handle_tc_fetch_data",
+                              "1.0"));
+
+  HostPort src_addr;
+  ASSERT_OK(HostPortFromPB(ts0->registration.rpc_addresses(0), &src_addr));
+  ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
+                            std::numeric_limits<int64_t>::max(), kDefaultTimeout));
+
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(1, tablet_id,
+                                                 { TabletDataState::TABLET_DATA_TOMBSTONED },
+                                                 kDefaultTimeout));
+
+  // The source server will crash.
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(kDefaultTimeout));
+  cluster_->tablet_server(0)->Shutdown();
+
+  // It will restart without the fault flag set.
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  // Attempt the copy again. This time it should succeed.
+  ASSERT_OK(WaitUntilTabletRunning(ts0, tablet_id, kDefaultTimeout));
+  ASSERT_OK(StartTabletCopy(ts1, tablet_id, ts0->uuid(), src_addr,
+                            std::numeric_limits<int64_t>::max(), kDefaultTimeout));
+  ASSERT_OK(WaitUntilTabletRunning(ts1, tablet_id, kDefaultTimeout));
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index ede90d6..2c7849d 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2297,10 +2297,11 @@ const NodeInstancePB& CatalogManager::NodeInstance() const {
   return master_->instance_pb();
 }
 
-Status CatalogManager::StartTabletCopy(
-    const StartTabletCopyRequestPB& req,
-    boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) {
-  return Status::NotSupported("Tablet Copy not yet implemented for the master tablet");
+void CatalogManager::StartTabletCopy(
+    const StartTabletCopyRequestPB* /* req */,
+    std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
+  cb(Status::NotSupported("Tablet Copy not yet implemented for the master tablet"),
+     TabletServerErrorPB::UNKNOWN_ERROR);
 }
 
 // Interface used by RetryingTSRpcTask to pick the tablet server to

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 36a8f0c..aa15c1b 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -494,15 +494,15 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   //
   // See also: TabletPeerLookupIf, ConsensusServiceImpl.
   virtual Status GetTabletPeer(const std::string& tablet_id,
-                               scoped_refptr<tablet::TabletPeer>* tablet_peer) const OVERRIDE;
+                               scoped_refptr<tablet::TabletPeer>* tablet_peer) const override;
 
-  virtual const NodeInstancePB& NodeInstance() const OVERRIDE;
+  virtual const NodeInstancePB& NodeInstance() const override;
 
   bool IsInitialized() const;
 
-  virtual Status StartTabletCopy(
-      const consensus::StartTabletCopyRequestPB& req,
-      boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) OVERRIDE;
+  virtual void StartTabletCopy(
+      const consensus::StartTabletCopyRequestPB* req,
+      std::function<void(const Status&, tserver::TabletServerErrorPB::Code)> cb) override;
 
   // Returns this CatalogManager's role in a consensus configuration. CatalogManager
   // must be initialized before calling this method.

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/tserver/tablet_copy_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc
index fc31259..78b0fec 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -183,8 +183,8 @@ void TabletCopyServiceImpl::CheckSessionActive(
 }
 
 void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
-                                           FetchDataResponsePB* resp,
-                                           rpc::RpcContext* context) {
+                                      FetchDataResponsePB* resp,
+                                      rpc::RpcContext* context) {
   const string& session_id = req->session_id();
 
   // Look up and validate tablet copy session.

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/tserver/tablet_peer_lookup.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_peer_lookup.h b/src/kudu/tserver/tablet_peer_lookup.h
index abc7861..70901fd 100644
--- a/src/kudu/tserver/tablet_peer_lookup.h
+++ b/src/kudu/tserver/tablet_peer_lookup.h
@@ -18,6 +18,7 @@
 #define KUDU_TSERVER_TABLET_PEER_LOOKUP_H_
 
 #include <boost/optional/optional_fwd.hpp>
+#include <functional>
 #include <memory>
 #include <string>
 
@@ -51,8 +52,9 @@ class TabletPeerLookupIf {
 
   virtual const NodeInstancePB& NodeInstance() const = 0;
 
-  virtual Status StartTabletCopy(const consensus::StartTabletCopyRequestPB& req,
-                                      boost::optional<TabletServerErrorPB::Code>* error_code) = 0;
+  virtual void StartTabletCopy(
+      const consensus::StartTabletCopyRequestPB* req,
+      std::function<void(const Status&, TabletServerErrorPB::Code)> cb) = 0;
 };
 
 } // namespace tserver

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index d107bca..84312f3 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -988,20 +988,19 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR
 }
 
 void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req,
-                                                StartTabletCopyResponsePB* resp,
-                                                rpc::RpcContext* context) {
+                                           StartTabletCopyResponsePB* resp,
+                                           rpc::RpcContext* context) {
   if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) {
     return;
   }
-  boost::optional<TabletServerErrorPB::Code> error_code;
-  Status s = tablet_manager_->StartTabletCopy(*req, &error_code);
-  if (!s.ok()) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         error_code.get_value_or(TabletServerErrorPB::UNKNOWN_ERROR),
-                         context);
-    return;
-  }
-  context->RespondSuccess();
+  auto response_callback = [context, resp](const Status& s, TabletServerErrorPB::Code error_code) {
+    if (!s.ok()) {
+      SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
+      return;
+    }
+    context->RespondSuccess();
+  };
+  tablet_manager_->StartTabletCopy(req, response_callback);
 }
 
 void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 42ed06f..8214655 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -59,6 +59,10 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
 
+DEFINE_int32(num_tablets_to_copy_simultaneously, 10,
+             "Number of threads available to copy tablets from remote servers.");
+TAG_FLAG(num_tablets_to_copy_simultaneously, advanced);
+
 DEFINE_int32(num_tablets_to_open_simultaneously, 0,
              "Number of threads available to open tablets during startup. If this "
              "is set to 0 (the default), then the number of bootstrap threads will "
@@ -171,16 +175,24 @@ TSTabletManager::~TSTabletManager() {
 Status TSTabletManager::Init() {
   CHECK_EQ(state(), MANAGER_INITIALIZING);
 
+  // Start the tablet copy thread pool. We set a max queue size of 0 so that if
+  // the number of requests exceeds the number of threads, a
+  // SERVICE_UNAVAILABLE error may be returned to the remote caller.
+  RETURN_NOT_OK(ThreadPoolBuilder("tablet-copy")
+                .set_max_queue_size(0)
+                .set_max_threads(FLAGS_num_tablets_to_copy_simultaneously)
+                .Build(&tablet_copy_pool_));
+
   // Start the threadpool we'll use to open tablets.
   // This has to be done in Init() instead of the constructor, since the
   // FsManager isn't initialized until this point.
-  int max_bootstrap_threads = FLAGS_num_tablets_to_open_simultaneously;
-  if (max_bootstrap_threads == 0) {
+  int max_open_threads = FLAGS_num_tablets_to_open_simultaneously;
+  if (max_open_threads == 0) {
     // Default to the number of disks.
-    max_bootstrap_threads = fs_manager_->GetDataRootDirs().size();
+    max_open_threads = fs_manager_->GetDataRootDirs().size();
   }
-  RETURN_NOT_OK(ThreadPoolBuilder("tablet-bootstrap")
-                .set_max_threads(max_bootstrap_threads)
+  RETURN_NOT_OK(ThreadPoolBuilder("tablet-open")
+                .set_max_threads(max_open_threads)
                 .Build(&open_tablet_pool_));
 
   // Search for tablets in the metadata dir.
@@ -305,14 +317,13 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
   return Status::OK();
 }
 
-// If 'expr' fails, log a message, tombstone the given tablet, and return the
-// error status.
+// If 'expr' fails, log a message, tombstone the given tablet, and return.
 #define TOMBSTONE_NOT_OK(expr, peer, msg) \
   do { \
-    Status _s = (expr); \
+    const Status& _s = (expr); \
     if (PREDICT_FALSE(!_s.ok())) { \
       LogAndTombstone((peer), (msg), _s); \
-      return _s; \
+      return; \
     } \
   } while (0)
 
@@ -321,27 +332,103 @@ Status TSTabletManager::CheckLeaderTermNotLower(const string& tablet_id,
                                                 int64_t last_logged_term) {
   if (PREDICT_FALSE(leader_term < last_logged_term)) {
     Status s = Status::InvalidArgument(
-        Substitute("Leader has replica of tablet $0 with term $1 "
-                    "lower than last logged term $2 on local replica. Rejecting "
-                    "tablet copy request",
-                    tablet_id,
-                    leader_term, last_logged_term));
+        Substitute("Leader has replica of tablet $0 with term $1, which "
+                   "is lower than last-logged term $2 on local replica. Rejecting "
+                   "tablet copy request",
+                   tablet_id, leader_term, last_logged_term));
     LOG(WARNING) << LogPrefix(tablet_id) << "Tablet Copy: " << s.ToString();
     return s;
   }
   return Status::OK();
 }
 
-Status TSTabletManager::StartTabletCopy(
-    const StartTabletCopyRequestPB& req,
-    boost::optional<TabletServerErrorPB::Code>* error_code) {
-  const string& tablet_id = req.tablet_id();
-  const string& copy_source_uuid = req.copy_peer_uuid();
-  HostPort copy_source_addr;
-  RETURN_NOT_OK(HostPortFromPB(req.copy_peer_addr(), &copy_source_addr));
-  int64_t leader_term = req.caller_term();
+// Tablet Copy runnable that will run on a ThreadPool.
+class TabletCopyRunnable : public Runnable {
+ public:
+  TabletCopyRunnable(TSTabletManager* ts_tablet_manager,
+                     const StartTabletCopyRequestPB* req,
+                     std::function<void(const Status&, TabletServerErrorPB::Code)> cb)
+      : ts_tablet_manager_(ts_tablet_manager),
+        req_(req),
+        cb_(std::move(cb)) {
+  }
+
+  virtual ~TabletCopyRunnable() {
+    // If the Runnable is destroyed without the Run() method being invoked, we
+    // must invoke the user callback ourselves in order to free request
+    // resources. This may happen when the ThreadPool is shut down while the
+    // Runnable is enqueued.
+    if (!cb_invoked_) {
+      cb_(Status::ServiceUnavailable("Tablet server shutting down"),
+          TabletServerErrorPB::THROTTLED);
+    }
+  }
+
+  virtual void Run() override {
+    ts_tablet_manager_->RunTabletCopy(req_, cb_);
+    cb_invoked_ = true;
+  }
+
+  // Disable automatic invocation of the callback by the destructor.
+  void DisableCallback() {
+    cb_invoked_ = true;
+  }
+
+ private:
+  TSTabletManager* const ts_tablet_manager_;
+  const StartTabletCopyRequestPB* const req_;
+  const std::function<void(const Status&, TabletServerErrorPB::Code)> cb_;
+  bool cb_invoked_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(TabletCopyRunnable);
+};
+
+void TSTabletManager::StartTabletCopy(
+    const StartTabletCopyRequestPB* req,
+    std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
+  shared_ptr<TabletCopyRunnable> runnable(new TabletCopyRunnable(this, req, cb));
+  Status s = tablet_copy_pool_->Submit(runnable);
+  if (PREDICT_TRUE(s.ok())) {
+    return;
+  }
+
+  // We were unable to submit the TabletCopyRunnable to the ThreadPool. We will
+  // invoke the callback ourselves, so disable the automatic callback mechanism.
+  runnable->DisableCallback();
+
+  // Thread pool is at capacity.
+  if (s.IsServiceUnavailable()) {
+    cb(s, TabletServerErrorPB::THROTTLED);
+    return;
+  }
+  cb(s, TabletServerErrorPB::UNKNOWN_ERROR);
+}
 
-  const string kLogPrefix = LogPrefix(tablet_id);
+#define CALLBACK_AND_RETURN(status) \
+  do { \
+    cb(status, error_code); \
+    return; \
+  } while (0)
+
+#define CALLBACK_RETURN_NOT_OK(expr) \
+  do { \
+    Status _s = (expr); \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      CALLBACK_AND_RETURN(_s); \
+    } \
+  } while (0)
+
+void TSTabletManager::RunTabletCopy(
+    const StartTabletCopyRequestPB* req,
+    std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
+
+  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
+
+  const string& tablet_id = req->tablet_id();
+  const string& copy_source_uuid = req->copy_peer_uuid();
+  HostPort copy_source_addr;
+  CALLBACK_RETURN_NOT_OK(HostPortFromPB(req->copy_peer_addr(), &copy_source_addr));
+  int64_t leader_term = req->caller_term();
 
   scoped_refptr<TabletPeer> old_tablet_peer;
   scoped_refptr<TabletMetadata> meta;
@@ -356,8 +443,8 @@ Status TSTabletManager::StartTabletCopy(
     Status ret = StartTabletStateTransitionUnlocked(tablet_id, "copying tablet",
                                                     &deleter);
     if (!ret.ok()) {
-      *error_code = TabletServerErrorPB::ALREADY_INPROGRESS;
-      return ret;
+      error_code = TabletServerErrorPB::ALREADY_INPROGRESS;
+      CALLBACK_AND_RETURN(ret);
     }
   }
 
@@ -371,18 +458,19 @@ Status TSTabletManager::StartTabletCopy(
                    << "Found tablet in TABLET_DATA_COPYING state during StartTabletCopy()";
       case TABLET_DATA_TOMBSTONED: {
         int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
-        RETURN_NOT_OK(CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term));
+        CALLBACK_RETURN_NOT_OK(CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term));
         break;
       }
       case TABLET_DATA_READY: {
         Log* log = old_tablet_peer->log();
         if (!log) {
-          return Status::IllegalState("Log unavailable. Tablet is not running", tablet_id);
+          CALLBACK_AND_RETURN(
+              Status::IllegalState("Log unavailable. Tablet is not running", tablet_id));
         }
         OpId last_logged_opid;
         log->GetLatestEntryOpId(&last_logged_opid);
         int64_t last_logged_term = last_logged_opid.term();
-        RETURN_NOT_OK(CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term));
+        CALLBACK_RETURN_NOT_OK(CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term));
 
         // Tombstone the tablet and store the last-logged OpId.
         old_tablet_peer->Shutdown();
@@ -394,22 +482,25 @@ Status TSTabletManager::StartTabletCopy(
         // will simply tablet copy this replica again. We could try to
         // check again after calling Shutdown(), and if the check fails, try to
         // reopen the tablet. For now, we live with the (unlikely) race.
-        RETURN_NOT_OK_PREPEND(DeleteTabletData(meta, TABLET_DATA_TOMBSTONED, last_logged_opid),
-                              Substitute("Unable to delete on-disk data from tablet $0",
-                                         tablet_id));
+        Status s = DeleteTabletData(meta, TABLET_DATA_TOMBSTONED, last_logged_opid);
+        if (PREDICT_FALSE(!s.ok())) {
+          CALLBACK_AND_RETURN(
+              s.CloneAndPrepend(Substitute("Unable to delete on-disk data from tablet $0",
+                                           tablet_id)));
+        }
         break;
       }
       default:
-        return Status::IllegalState(
+        CALLBACK_AND_RETURN(Status::IllegalState(
             Substitute("Found tablet in unsupported state for tablet copy. "
                         "Tablet: $0, tablet data state: $1",
-                        tablet_id, TabletDataState_Name(data_state)));
+                        tablet_id, TabletDataState_Name(data_state))));
     }
   }
 
-  string init_msg = kLogPrefix + Substitute("Initiating tablet copy from Peer $0 ($1)",
-                                            copy_source_uuid,
-                                            copy_source_addr.ToString());
+  const string kSrcPeerInfo = Substitute("$0 ($1)", copy_source_uuid, copy_source_addr.ToString());
+  string init_msg = LogPrefix(tablet_id) +
+                    Substitute("Initiating tablet copy from peer $0", kSrcPeerInfo);
   LOG(INFO) << init_msg;
   TRACE(init_msg);
 
@@ -417,9 +508,9 @@ Status TSTabletManager::StartTabletCopy(
 
   // Download and persist the remote superblock in TABLET_DATA_COPYING state.
   if (replacing_tablet) {
-    RETURN_NOT_OK(tc_client.SetTabletToReplace(meta, leader_term));
+    CALLBACK_RETURN_NOT_OK(tc_client.SetTabletToReplace(meta, leader_term));
   }
-  RETURN_NOT_OK(tc_client.Start(copy_source_addr, &meta));
+  CALLBACK_RETURN_NOT_OK(tc_client.Start(copy_source_addr, &meta));
 
   // From this point onward, the superblock is persisted in TABLET_DATA_COPYING
   // state, and we need to tombtone the tablet if additional steps prior to
@@ -428,13 +519,21 @@ Status TSTabletManager::StartTabletCopy(
   // Registering a non-initialized TabletPeer offers visibility through the Web UI.
   RegisterTabletPeerMode mode = replacing_tablet ? REPLACEMENT_PEER : NEW_PEER;
   scoped_refptr<TabletPeer> tablet_peer = CreateAndRegisterTabletPeer(meta, mode);
-  string peer_str = copy_source_uuid + " (" + copy_source_addr.ToString() + ")";
+
+  // Now we invoke the StartTabletCopy callback and respond success to the
+  // remote caller. Then we proceed to do most of the actual tablet copying work.
+  cb(Status::OK(), TabletServerErrorPB::UNKNOWN_ERROR);
+  cb = [](const Status&, TabletServerErrorPB::Code) {
+    LOG(FATAL) << "Callback invoked twice from TSTabletManager::RunTabletCopy()";
+  };
+
+  // From this point onward, we do not notify the caller about progress or success.
 
   // Download all of the remote files.
   Status s = tc_client.FetchAll(implicit_cast<TabletStatusListener*>(tablet_peer.get()));
   TOMBSTONE_NOT_OK(s, tablet_peer,
-                   "Tablet Copy: Unable to fetch data from remote peer " +
-                   copy_source_uuid + " (" + copy_source_addr.ToString() + ")");
+                   Substitute("Tablet Copy: Unable to fetch data from remote peer $0",
+                              kSrcPeerInfo));
 
   MAYBE_FAULT(FLAGS_fault_crash_after_tc_files_fetched);
 
@@ -442,12 +541,9 @@ Status TSTabletManager::StartTabletCopy(
   // TabletDataState in the superblock to TABLET_DATA_READY.
   TOMBSTONE_NOT_OK(tc_client.Finish(), tablet_peer, "Tablet Copy: Failure calling Finish()");
 
-  // We run this asynchronously. We don't tombstone the tablet if this fails,
-  // because if we were to fail to open the tablet, on next startup, it's in a
-  // valid fully-copied state.
-  RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
-                                              this, meta, deleter)));
-  return Status::OK();
+  // We don't tombstone the tablet if opening the tablet fails, because on next
+  // startup it's still in a valid, fully-copied state.
+  OpenTablet(meta, deleter);
 }
 
 // Create and register a new TabletPeer, given tablet metadata.
@@ -720,7 +816,11 @@ void TSTabletManager::Shutdown() {
     }
   }
 
-  // Shut down the bootstrap pool, so new tablets are registered after this point.
+  // Stop copying tablets.
+  // TODO(mpercy): Cancel all outstanding tablet copy tasks (KUDU-1795).
+  tablet_copy_pool_->Shutdown();
+
+  // Shut down the bootstrap pool, so no new tablets are registered after this point.
   open_tablet_pool_->Shutdown();
 
   // Take a snapshot of the peers list -- that way we don't have to hold

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5f9b3/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index b09bfe1..43f5396 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -140,17 +140,21 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
 
   virtual Status GetTabletPeer(const std::string& tablet_id,
                                scoped_refptr<tablet::TabletPeer>* tablet_peer) const
-                               OVERRIDE;
+                               override;
 
-  virtual const NodeInstancePB& NodeInstance() const OVERRIDE;
+  virtual const NodeInstancePB& NodeInstance() const override;
 
-  // Initiate tablet copy of the specified tablet.
+  // Initiate tablet copy of the specified tablet on the tablet_copy_pool_.
   // See the StartTabletCopy() RPC declaration in consensus.proto for details.
-  // Currently this runs the entire procedure synchronously.
-  // TODO: KUDU-921: Run this procedure on a background thread.
-  virtual Status StartTabletCopy(
-      const consensus::StartTabletCopyRequestPB& req,
-      boost::optional<TabletServerErrorPB::Code>* error_code) OVERRIDE;
+  // 'cb' is guaranteed to be invoked as a callback.
+  virtual void StartTabletCopy(
+      const consensus::StartTabletCopyRequestPB* req,
+      std::function<void(const Status&, TabletServerErrorPB::Code)> cb) override;
+
+  // Synchronously run the tablet copy procedure.
+  void RunTabletCopy(
+      const consensus::StartTabletCopyRequestPB* req,
+      std::function<void(const Status&, TabletServerErrorPB::Code)> cb);
 
   // Adds updated tablet information to 'report'.
   void PopulateFullTabletReport(master::TabletReportPB* report) const;
@@ -312,6 +316,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
 
   TSTabletManagerStatePB state_;
 
+  // Thread pool used to run tablet copy operations.
+  gscoped_ptr<ThreadPool> tablet_copy_pool_;
+
   // Thread pool used to open the tablets async, whether bootstrap is required or not.
   gscoped_ptr<ThreadPool> open_tablet_pool_;