You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/11/26 02:15:11 UTC

[kudu] branch master updated: KUDU-3214 Parallelize DownloadWALs in tablet copy operation

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6459227  KUDU-3214 Parallelize DownloadWALs in tablet copy operation
6459227 is described below

commit 64592274ac4ee5bff4c01807602e6e39714a0f80
Author: ningw <19...@gmail.com>
AuthorDate: Mon Nov 16 00:13:18 2020 +0800

    KUDU-3214 Parallelize DownloadWALs in tablet copy operation
    
    Make DownloadWALs operation in parallel.
    
    Add tests:
      - Correctly collect status of parallelized DownloadWALs.
      - Random inject latency and heavy insert.
      - Random inject latency and heavy update.
    
    Perf: Timing download a tablet without any block data.
    WAL size (In MB)         575     1149     2183
    WAL count                87      174      348
    In seq:                  3.417s  6.404s   12.371s
    In parallel(4threads):   2.543s  4.858s   8.761s
    
    About 25% promotion.
    
    Change-Id: Ib513118d233efdf4be7cbb0b790354c4c3e1bd84
    Reviewed-on: http://gerrit.cloudera.org:8080/16726
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../integration-tests/mini_cluster_fs_inspector.cc |   1 +
 src/kudu/integration-tests/tablet_copy-itest.cc    | 202 ++++++++++++++++++++-
 src/kudu/tserver/tablet_copy_client-test.cc        |  15 ++
 src/kudu/tserver/tablet_copy_client.cc             |  81 +++++++--
 src/kudu/tserver/tablet_copy_client.h              |  14 +-
 5 files changed, 290 insertions(+), 23 deletions(-)

diff --git a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
index b2cbd42..8b8a2be 100644
--- a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
+++ b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
@@ -216,6 +216,7 @@ Status MiniClusterFsInspector::CheckTabletDataStateOnTS(
   }
 
   vector<string> state_names;
+  state_names.reserve(allowed_states.size());
   for (auto state : allowed_states) {
     state_names.push_back(TabletDataState_Name(state));
   }
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 746c012..68521a6 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -36,6 +36,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/common/partial_row.h"
@@ -93,8 +94,14 @@ DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
 DEFINE_int32(test_delete_leader_num_writer_threads, 1,
              "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
 
+DECLARE_bool(tablet_copy_download_wal_inject_latency);
+DECLARE_int32(log_segment_size_mb);
+
 using boost::none;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanBatch;
 using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::ExternalTabletServer;
@@ -147,7 +154,6 @@ METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 
 namespace kudu {
-
 class TabletCopyITest : public ExternalMiniClusterITestBase {
 };
 
@@ -340,6 +346,7 @@ TEST_F(TabletCopyITest, TestListTabletsDuringTabletCopy) {
   follower_ts = ts_map_[cluster_->tablet_server(follower_index)->uuid()];
 
   vector<std::thread> threads;
+  threads.reserve(FLAGS_test_num_threads);
   std::atomic<bool> finish(false);
   Barrier barrier(FLAGS_test_num_threads + 1); // include main thread
 
@@ -453,7 +460,7 @@ TEST_F(TabletCopyITest, TestCopyAfterFailedCopy) {
   // Regression condition for KUDU-2293: the above second attempt at a copy
   // should not crash the destination server.
   for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    const auto tserver = cluster_->tablet_server(i);
+    const auto* tserver = cluster_->tablet_server(i);
     ASSERT_TRUE(tserver->IsProcessAlive())
         << Substitute("Tablet server $0 ($1) has crashed...", i, tserver->uuid());
   }
@@ -660,6 +667,7 @@ TEST_F(TabletCopyITest, TestConcurrentTabletCopys) {
   ASSERT_OK(WaitForNumTabletsOnTS(target_ts, kNumTablets, timeout, &tablets));
 
   vector<string> tablet_ids;
+  tablet_ids.reserve(tablets.size());
   for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
     tablet_ids.push_back(t.tablet_status().tablet_id());
   }
@@ -1884,7 +1892,7 @@ TEST_F(TabletCopyITest, TestBeginTabletCopySessionConcurrency) {
   workload.set_num_tablets(kNumTablets);
   workload.Setup();
 
-  auto ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+  auto* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
   vector<string> tablet_ids;
   ASSERT_EVENTUALLY([&] {
     ASSERT_OK(itest::ListRunningTabletIds(ts, kTimeout, &tablet_ids));
@@ -1938,4 +1946,192 @@ TEST_F(TabletCopyITest, TestBeginTabletCopySessionConcurrency) {
   }
 }
 
+// Test that wal segments can be downloaded correctly via TabletCopyClient.
+// When downloading wal segments, random latency is injected so that wal segment
+// may be downloaded in random order.
+// In this case, WAL is consisted of heavy inserts, all insert operations are
+// kept in WAL by disabling all 'Flush' maintenance operation.
+//
+// Verify wal contents by comparing the count of rows.
+TEST_F(TabletCopyITest, TestDownloadWalInParallelWithHeavyInsert) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_log_segment_size_mb = 1;
+  FLAGS_tablet_copy_download_wal_inject_latency = true;
+
+  const int kNumTablets = 1;
+  const int kNumThreads = 8;
+  // The number of WAL segments should greater than num of threads to download
+  // tablet.
+  const int kNumWalSegments = 6;
+
+  // Make the original tablet keeps everything in the wal.
+  vector<string> ts_flags;
+  ts_flags.emplace_back("--enable_flush_memrowset=false");
+  ts_flags.emplace_back("--enable_flush_deltamemstores=false");
+  ts_flags.emplace_back("--tablet_copy_download_threads_nums_per_session=4");
+  ts_flags.emplace_back("--log_segment_size_mb=1");
+  NO_FATALS(StartCluster(ts_flags, {}, /*num_tablet_servers=*/3));
+
+  // Init workload in INSERT_SEQUENTIAL_ROWS mode.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(3);
+  workload.set_num_tablets(kNumTablets);
+  workload.set_num_write_threads(kNumThreads);
+  workload.set_write_pattern(TestWorkload::WritePattern::INSERT_SEQUENTIAL_ROWS);
+  workload.Setup();
+
+  const int first_leader_index = 0;
+  const int second_leader_index = 1;
+  TServerDetails* first_leader = ts_map_[cluster_->tablet_server(first_leader_index)->uuid()];
+  TServerDetails* second_leader = ts_map_[cluster_->tablet_server(second_leader_index)->uuid()];
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(first_leader, 1, kTimeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                     tablet_id, kTimeout));
+  }
+
+  // Elect a leader for term 1, then generate some data to copy.
+  ASSERT_OK(StartElection(first_leader, tablet_id, kTimeout));
+
+  workload.Start();
+  inspect_->WaitForMinFilesInTabletWalDirOnTS(first_leader_index, tablet_id, kNumWalSegments);
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+  // Delete the tablet from second leader ts and wait until the tablet recovery
+  // from the first leader via TabletCopy.
+  ASSERT_OK(DeleteTablet(second_leader, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(second_leader_index, tablet_id,
+                                                 { tablet::TABLET_DATA_READY },
+                                                 kTimeout));
+
+  ASSERT_OK(WaitUntilTabletRunning(second_leader, tablet_id, kTimeout));
+
+  // Make the second ts as leader.
+  ASSERT_OK(StartElection(second_leader, tablet_id, kTimeout));
+
+  // Verify WAL contents are same among tservers of the cluster.
+  {
+    ClusterVerifier v(cluster_.get());
+    v.CheckCluster();
+  }
+}
+
+// Test that wal segments can be downloaded correctly via TabletCopyClient.
+// When downloading wal segments, random latency is injected so that wal segment
+// may be downloaded in random order.
+// In this case, WAL is consisted of heavy updates, all insert operations are
+// kept in WAL by disabling all 'Flush' maintenance operation.
+//
+// Verify wal contents by comparing the content.
+TEST_F(TabletCopyITest, TestDownloadWalInParallelWithHeavyUpdate) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_log_segment_size_mb = 1;
+  FLAGS_tablet_copy_download_wal_inject_latency = true;
+
+  const int kNumTablets = 1;
+  const int kNumThreads = 8;
+  const int kNumWalSegments = 6;
+
+  // Make the original tablet keeps everything in the wal.
+  vector<string> ts_flags;
+  ts_flags.emplace_back("--enable_flush_memrowset=false");
+  ts_flags.emplace_back("--enable_flush_deltamemstores=false");
+  ts_flags.emplace_back("--tablet_copy_download_threads_nums_per_session=4");
+  ts_flags.emplace_back("--log_segment_size_mb=1");
+  NO_FATALS(StartCluster(ts_flags, {}, /*num_tablet_servers=*/3));
+
+  // Init workload in UPDATE_ONE_ROW mode.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(3);
+  workload.set_num_tablets(kNumTablets);
+  workload.set_num_write_threads(kNumThreads);
+  workload.set_write_pattern(TestWorkload::WritePattern::UPDATE_ONE_ROW);
+  workload.Setup();
+
+  // Use ts0 as first leader. Use ts1 as second leader.
+  const int first_leader_index = 0;
+  const int second_leader_index = 1;
+  TServerDetails* first_leader = ts_map_[cluster_->tablet_server(first_leader_index)->uuid()];
+  TServerDetails* second_leader = ts_map_[cluster_->tablet_server(second_leader_index)->uuid()];
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(WaitForNumTabletsOnTS(first_leader, 1, kTimeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                     tablet_id, kTimeout));
+  }
+
+  // Elect a leader for term 1, then generate some data to copy.
+  ASSERT_OK(StartElection(first_leader, tablet_id, kTimeout));
+
+  workload.Start();
+  inspect_->WaitForMinFilesInTabletWalDirOnTS(first_leader_index, tablet_id, kNumWalSegments);
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+  // Scan rows from cluster with given table name, and store the stringified
+  // rows to result_collector.
+  auto scan_func = [](const decltype(client_)& client,
+                      const string& table_name,
+                      vector<string>* result_collector) {
+    client::sp::shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name, &table));
+
+    KuduScanner s(table.get());
+    ASSERT_OK(s.SetSelection(client::KuduClient::LEADER_ONLY));
+    ASSERT_OK(s.SetReadMode(KuduScanner::READ_LATEST));
+    ASSERT_OK(s.Open());
+    KuduScanBatch batch;
+    while (s.HasMoreRows()) {
+      ASSERT_OK(s.NextBatch(&batch));
+      for (const auto& row : batch) {
+        result_collector->emplace_back(row.ToString());
+      }
+    }
+    s.Close();
+  };
+
+  // Scan row from first leader and collect result.
+  vector<string> result_ts0;
+  scan_func(client_, workload.table_name(), &result_ts0);
+
+  ASSERT_EQ(1, result_ts0.size());
+
+  // Delete the tablet from second leader ts and wait until the tablet recovery
+  // from the first leader via TabletCopy.
+  ASSERT_OK(DeleteTablet(second_leader, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(second_leader_index, tablet_id,
+                                                 { tablet::TABLET_DATA_READY },
+                                                 kTimeout));
+  ASSERT_OK(WaitUntilTabletRunning(second_leader, tablet_id, kTimeout));
+
+  // Make the second leader as leader.
+  ASSERT_OK(StartElection(second_leader, tablet_id, kTimeout));
+
+  // Verify WAL contents are same among tservers of the cluster.
+  {
+    ClusterVerifier v(cluster_.get());
+    v.CheckCluster();
+  }
+
+  // Scan row from second leader and collect result.
+  vector<string> result_ts1;
+  scan_func(client_, workload.table_name(), &result_ts1);
+  ASSERT_EQ(1, result_ts1.size());
+
+  // Check the content should be the same.
+  ASSERT_EQ(result_ts0[0], result_ts1[0]);
+}
+
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index efd2940..f50fa52 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -76,6 +76,7 @@ using std::vector;
 
 DECLARE_double(env_inject_eio);
 DECLARE_double(tablet_copy_fault_crash_during_download_block);
+DECLARE_double(tablet_copy_fault_crash_during_download_wal);
 DECLARE_int32(tablet_copy_download_threads_nums_per_session);
 DECLARE_string(block_manager);
 DECLARE_string(env_inject_eio_globs);
@@ -297,12 +298,26 @@ TEST_F(TabletCopyClientTest, TestDownloadBlockMayFail) {
   FLAGS_tablet_copy_fault_crash_during_download_block = 0.5;
   FLAGS_tablet_copy_download_threads_nums_per_session = 16;
 
+  ASSERT_OK(ResetTabletCopyClient());
   ASSERT_OK(StartCopy());
   Status s = client_->DownloadBlocks();
   ASSERT_TRUE(s.IsIOError());
   ASSERT_STR_CONTAINS(s.ToString(), "Injected failure on downloading block");
 }
 
+// Test that error status is properly reported if there was a failure in any
+// of multiple threads downloading tablet's wal segments.
+TEST_F(TabletCopyClientTest, TestDownloadWalMayFail) {
+  FLAGS_tablet_copy_fault_crash_during_download_wal = 1;
+  FLAGS_tablet_copy_download_threads_nums_per_session = 4;
+
+  ASSERT_OK(ResetTabletCopyClient());
+  ASSERT_OK(StartCopy());
+  Status s = client_->DownloadWALs();
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "Injected failure on downloading wal");
+}
+
 // Basic WAL segment download unit test.
 TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
   ASSERT_OK(StartCopy());
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 64c3a0f..dee2afe 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -91,6 +91,24 @@ DEFINE_int32(tablet_copy_download_file_inject_latency_ms, 0,
              "to take much longer. For use in tests only.");
 TAG_FLAG(tablet_copy_download_file_inject_latency_ms, hidden);
 
+DEFINE_bool(tablet_copy_download_wal_inject_latency, false,
+            "If true, injects artificial latency in DownloadWAL() operations. "
+            "For use in test only.");
+TAG_FLAG(tablet_copy_download_wal_inject_latency, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency, runtime);
+
+DEFINE_int32(tablet_copy_download_wal_inject_latency_ms_mean, 200,
+             "The number of milliseconds of latency to inject, on average. "
+             "Only take effects if --tablet_copy_download_wal_inject_latency is true");
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_mean, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_mean, runtime);
+
+DEFINE_int32(tablet_copy_download_wal_inject_latency_ms_stddev, 200,
+             "The standard deviation of latency to inject on DownloadWAL()"
+             "Only take effects if --tablet_copy_download_wal_inject_latency is true");
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_stddev, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_stddev, runtime);
+
 DEFINE_double(tablet_copy_fault_crash_on_fetch_all, 0.0,
               "Fraction of the time that the server will crash when FetchAll() "
               "is called on the TabletCopyClient. (For testing only!)");
@@ -110,6 +128,12 @@ DEFINE_double(tablet_copy_fault_crash_during_download_block, 0.0,
 TAG_FLAG(tablet_copy_fault_crash_during_download_block, unsafe);
 TAG_FLAG(tablet_copy_fault_crash_during_download_block, runtime);
 
+DEFINE_double(tablet_copy_fault_crash_during_download_wal, 0.0,
+              "Fraction of the time that DownloadWal() will fail. "
+              "For use in test only.");
+TAG_FLAG(tablet_copy_fault_crash_during_download_wal, unsafe);
+TAG_FLAG(tablet_copy_fault_crash_during_download_wal, runtime);
+
 DEFINE_int32(tablet_copy_download_threads_nums_per_session, 4,
              "Number of threads per tablet copy session for downloading tablet data blocks.");
 DEFINE_validator(tablet_copy_download_threads_nums_per_session,
@@ -185,10 +209,10 @@ TabletCopyClient::TabletCopyClient(
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->Increment();
   }
-  ThreadPoolBuilder("blocks-download-pool-" + tablet_id)
+  ThreadPoolBuilder("tablet-download-pool-" + tablet_id)
     .set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
     .set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
-    .Build(&blocks_download_pool_);
+    .Build(&tablet_download_pool_);
 }
 
 TabletCopyClient::~TabletCopyClient() {
@@ -197,7 +221,7 @@ TabletCopyClient::~TabletCopyClient() {
                                              LogPrefix()));
   WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after aborted copy",
                                   LogPrefix()));
-  blocks_download_pool_->Shutdown();
+  tablet_download_pool_->Shutdown();
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
   }
@@ -565,17 +589,35 @@ Status TabletCopyClient::DownloadWALs() {
   RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
 
   // Download the WAL segments.
-  int num_segments = wal_seqnos_.size();
+  const int num_segments = wal_seqnos_.size();
   LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
-  uint64_t counter = 0;
+
+  atomic<uint64_t> counter(0);
+  Status end_status;
   for (uint64_t seg_seqno : wal_seqnos_) {
-    SetStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
-                                seg_seqno, counter + 1, num_segments));
-    RETURN_NOT_OK(DownloadWAL(seg_seqno));
-    ++counter;
+    RETURN_NOT_OK(tablet_download_pool_->Submit([&, seg_seqno]() {
+      SetStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
+                                  seg_seqno, counter.load() + 1, num_segments));
+      {
+        std::lock_guard<simple_spinlock> l(simple_lock_);
+        if (!end_status.ok()) {
+          return;
+        }
+      }
+      Status s = DownloadWAL(seg_seqno);
+      if (!s.ok()) {
+        std::lock_guard<simple_spinlock> l(simple_lock_);
+        if (end_status.ok()) {
+          end_status = s;
+        }
+        return;
+      }
+      counter++;
+    }));
   }
+  tablet_download_pool_->Wait();
 
-  return Status::OK();
+  return end_status;
 }
 
 int TabletCopyClient::CountRemoteBlocks() const {
@@ -690,17 +732,32 @@ Status TabletCopyClient::DownloadBlocks() {
   // Download rowsets in parallel.
   // Each task downloads the blocks of the corresponding rowset sequentially.
   for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) {
-    RETURN_NOT_OK(blocks_download_pool_->Submit([&]() mutable {
+    RETURN_NOT_OK(tablet_download_pool_->Submit([&]() mutable {
       DownloadRowset(src_rowset, num_remote_blocks, &block_count, &end_status);
     }));
   }
-  blocks_download_pool_->Wait();
+  tablet_download_pool_->Wait();
 
   return end_status;
 }
 
 Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
   VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
+
+  MAYBE_RETURN_FAILURE(FLAGS_tablet_copy_fault_crash_during_download_wal,
+                       Status::IOError("Injected failure on downloading wal"));
+  if (PREDICT_FALSE(FLAGS_tablet_copy_download_wal_inject_latency)) {
+    Random r(GetCurrentTimeMicros());
+    int sleep_ms = static_cast<int>(
+        r.Normal(FLAGS_tablet_copy_download_wal_inject_latency_ms_mean,
+                 FLAGS_tablet_copy_download_wal_inject_latency_ms_stddev));
+    if (sleep_ms > 0) {
+      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
+                               << "ms of latency in TabletCopyClient::DownloadWAL()";
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+    }
+  }
+
   RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading WAL for replica");
 
   DataIdPB data_id;
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 3d5395e..3452d6e 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -77,9 +77,6 @@ struct TabletCopyClientMetrics {
 // Client class for using tablet copy to copy a tablet from another host.
 // This class is not thread-safe.
 //
-// TODO:
-// * Parallelize download of blocks and WAL segments.
-//
 class TabletCopyClient {
  public:
 
@@ -140,6 +137,7 @@ class TabletCopyClient {
   FRIEND_TEST(TabletCopyClientTest, TestLifeCycle);
   FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
   FRIEND_TEST(TabletCopyClientTest, TestDownloadBlockMayFail);
+  FRIEND_TEST(TabletCopyClientTest, TestDownloadWalMayFail);
   FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
   FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
   FRIEND_TEST(TabletCopyClientAbortTest, TestAbort);
@@ -183,7 +181,7 @@ class TabletCopyClient {
   // End the tablet copy session.
   Status EndRemoteSession();
 
-  // Download all WAL files sequentially.
+  // Download all WAL files in parallel.
   Status DownloadWALs();
 
   // Download a single WAL file.
@@ -256,7 +254,7 @@ class TabletCopyClient {
   template<class Appendable>
   Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
 
-  Status VerifyData(uint64_t offset, const DataChunkPB& resp);
+  Status VerifyData(uint64_t offset, const DataChunkPB& chunk);
 
   // Runs the provided functor, which must send an RPC and return the result
   // status, until it succeeds, times out, or fails with a non-retriable error.
@@ -303,11 +301,11 @@ class TabletCopyClient {
   // Block transaction for the tablet copy.
   std::unique_ptr<fs::BlockCreationTransaction> transaction_;
 
-  // Thread pool for downloading all data blocks in parallel.
-  std::unique_ptr<ThreadPool> blocks_download_pool_;
+  // Thread pool for downloading all data blocks and wals in parallel.
+  std::unique_ptr<ThreadPool> tablet_download_pool_;
 
   // Protects adding/creating blocks, adding a rowset,
-  // reading/updating rowset download status.
+  // reading/updating rowset/wal download status.
   simple_spinlock simple_lock_;
 
   DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);