You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/11/26 02:15:11 UTC
[kudu] branch master updated: KUDU-3214 Parallelize DownloadWALs in
tablet copy operation
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6459227 KUDU-3214 Parallelize DownloadWALs in tablet copy operation
6459227 is described below
commit 64592274ac4ee5bff4c01807602e6e39714a0f80
Author: ningw <19...@gmail.com>
AuthorDate: Mon Nov 16 00:13:18 2020 +0800
KUDU-3214 Parallelize DownloadWALs in tablet copy operation
Make DownloadWALs operation in parallel.
Add tests:
- Correctly collect status of parallelized DownloadWALs.
- Random inject latency and heavy insert.
- Random inject latency and heavy update.
Perf: Timing download a tablet without any block data.
WAL size (In MB) 575 1149 2183
WAL count 87 174 348
In seq: 3.417s 6.404s 12.371s
In parallel(4threads): 2.543s 4.858s 8.761s
About 25% promotion.
Change-Id: Ib513118d233efdf4be7cbb0b790354c4c3e1bd84
Reviewed-on: http://gerrit.cloudera.org:8080/16726
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../integration-tests/mini_cluster_fs_inspector.cc | 1 +
src/kudu/integration-tests/tablet_copy-itest.cc | 202 ++++++++++++++++++++-
src/kudu/tserver/tablet_copy_client-test.cc | 15 ++
src/kudu/tserver/tablet_copy_client.cc | 81 +++++++--
src/kudu/tserver/tablet_copy_client.h | 14 +-
5 files changed, 290 insertions(+), 23 deletions(-)
diff --git a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
index b2cbd42..8b8a2be 100644
--- a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
+++ b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
@@ -216,6 +216,7 @@ Status MiniClusterFsInspector::CheckTabletDataStateOnTS(
}
vector<string> state_names;
+ state_names.reserve(allowed_states.size());
for (auto state : allowed_states) {
state_names.push_back(TabletDataState_Name(state));
}
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 746c012..68521a6 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -36,6 +36,7 @@
#include <gtest/gtest.h>
#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/partial_row.h"
@@ -93,8 +94,14 @@ DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
DEFINE_int32(test_delete_leader_num_writer_threads, 1,
"Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
+DECLARE_bool(tablet_copy_download_wal_inject_latency);
+DECLARE_int32(log_segment_size_mb);
+
using boost::none;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanBatch;
using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::ExternalTabletServer;
@@ -147,7 +154,6 @@ METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
namespace kudu {
-
class TabletCopyITest : public ExternalMiniClusterITestBase {
};
@@ -340,6 +346,7 @@ TEST_F(TabletCopyITest, TestListTabletsDuringTabletCopy) {
follower_ts = ts_map_[cluster_->tablet_server(follower_index)->uuid()];
vector<std::thread> threads;
+ threads.reserve(FLAGS_test_num_threads);
std::atomic<bool> finish(false);
Barrier barrier(FLAGS_test_num_threads + 1); // include main thread
@@ -453,7 +460,7 @@ TEST_F(TabletCopyITest, TestCopyAfterFailedCopy) {
// Regression condition for KUDU-2293: the above second attempt at a copy
// should not crash the destination server.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- const auto tserver = cluster_->tablet_server(i);
+ const auto* tserver = cluster_->tablet_server(i);
ASSERT_TRUE(tserver->IsProcessAlive())
<< Substitute("Tablet server $0 ($1) has crashed...", i, tserver->uuid());
}
@@ -660,6 +667,7 @@ TEST_F(TabletCopyITest, TestConcurrentTabletCopys) {
ASSERT_OK(WaitForNumTabletsOnTS(target_ts, kNumTablets, timeout, &tablets));
vector<string> tablet_ids;
+ tablet_ids.reserve(tablets.size());
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
tablet_ids.push_back(t.tablet_status().tablet_id());
}
@@ -1884,7 +1892,7 @@ TEST_F(TabletCopyITest, TestBeginTabletCopySessionConcurrency) {
workload.set_num_tablets(kNumTablets);
workload.Setup();
- auto ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+ auto* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
vector<string> tablet_ids;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(itest::ListRunningTabletIds(ts, kTimeout, &tablet_ids));
@@ -1938,4 +1946,192 @@ TEST_F(TabletCopyITest, TestBeginTabletCopySessionConcurrency) {
}
}
+// Test that wal segments can be downloaded correctly via TabletCopyClient.
+// When downloading wal segments, random latency is injected so that wal segment
+// may be downloaded in random order.
+// In this case, WAL is consisted of heavy inserts, all insert operations are
+// kept in WAL by disabling all 'Flush' maintenance operation.
+//
+// Verify wal contents by comparing the count of rows.
+TEST_F(TabletCopyITest, TestDownloadWalInParallelWithHeavyInsert) {
+ MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+ FLAGS_log_segment_size_mb = 1;
+ FLAGS_tablet_copy_download_wal_inject_latency = true;
+
+ const int kNumTablets = 1;
+ const int kNumThreads = 8;
+ // The number of WAL segments should greater than num of threads to download
+ // tablet.
+ const int kNumWalSegments = 6;
+
+ // Make the original tablet keeps everything in the wal.
+ vector<string> ts_flags;
+ ts_flags.emplace_back("--enable_flush_memrowset=false");
+ ts_flags.emplace_back("--enable_flush_deltamemstores=false");
+ ts_flags.emplace_back("--tablet_copy_download_threads_nums_per_session=4");
+ ts_flags.emplace_back("--log_segment_size_mb=1");
+ NO_FATALS(StartCluster(ts_flags, {}, /*num_tablet_servers=*/3));
+
+ // Init workload in INSERT_SEQUENTIAL_ROWS mode.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(3);
+ workload.set_num_tablets(kNumTablets);
+ workload.set_num_write_threads(kNumThreads);
+ workload.set_write_pattern(TestWorkload::WritePattern::INSERT_SEQUENTIAL_ROWS);
+ workload.Setup();
+
+ const int first_leader_index = 0;
+ const int second_leader_index = 1;
+ TServerDetails* first_leader = ts_map_[cluster_->tablet_server(first_leader_index)->uuid()];
+ TServerDetails* second_leader = ts_map_[cluster_->tablet_server(second_leader_index)->uuid()];
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(first_leader, 1, kTimeout, &tablets));
+ string tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Wait until all replicas are up and running.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id, kTimeout));
+ }
+
+ // Elect a leader for term 1, then generate some data to copy.
+ ASSERT_OK(StartElection(first_leader, tablet_id, kTimeout));
+
+ workload.Start();
+ inspect_->WaitForMinFilesInTabletWalDirOnTS(first_leader_index, tablet_id, kNumWalSegments);
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+ // Delete the tablet from second leader ts and wait until the tablet recovery
+ // from the first leader via TabletCopy.
+ ASSERT_OK(DeleteTablet(second_leader, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(second_leader_index, tablet_id,
+ { tablet::TABLET_DATA_READY },
+ kTimeout));
+
+ ASSERT_OK(WaitUntilTabletRunning(second_leader, tablet_id, kTimeout));
+
+ // Make the second ts as leader.
+ ASSERT_OK(StartElection(second_leader, tablet_id, kTimeout));
+
+ // Verify WAL contents are same among tservers of the cluster.
+ {
+ ClusterVerifier v(cluster_.get());
+ v.CheckCluster();
+ }
+}
+
+// Test that wal segments can be downloaded correctly via TabletCopyClient.
+// When downloading wal segments, random latency is injected so that wal segment
+// may be downloaded in random order.
+// In this case, WAL is consisted of heavy updates, all insert operations are
+// kept in WAL by disabling all 'Flush' maintenance operation.
+//
+// Verify wal contents by comparing the content.
+TEST_F(TabletCopyITest, TestDownloadWalInParallelWithHeavyUpdate) {
+ MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+ FLAGS_log_segment_size_mb = 1;
+ FLAGS_tablet_copy_download_wal_inject_latency = true;
+
+ const int kNumTablets = 1;
+ const int kNumThreads = 8;
+ const int kNumWalSegments = 6;
+
+ // Make the original tablet keeps everything in the wal.
+ vector<string> ts_flags;
+ ts_flags.emplace_back("--enable_flush_memrowset=false");
+ ts_flags.emplace_back("--enable_flush_deltamemstores=false");
+ ts_flags.emplace_back("--tablet_copy_download_threads_nums_per_session=4");
+ ts_flags.emplace_back("--log_segment_size_mb=1");
+ NO_FATALS(StartCluster(ts_flags, {}, /*num_tablet_servers=*/3));
+
+ // Init workload in UPDATE_ONE_ROW mode.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(3);
+ workload.set_num_tablets(kNumTablets);
+ workload.set_num_write_threads(kNumThreads);
+ workload.set_write_pattern(TestWorkload::WritePattern::UPDATE_ONE_ROW);
+ workload.Setup();
+
+ // Use ts0 as first leader. Use ts1 as second leader.
+ const int first_leader_index = 0;
+ const int second_leader_index = 1;
+ TServerDetails* first_leader = ts_map_[cluster_->tablet_server(first_leader_index)->uuid()];
+ TServerDetails* second_leader = ts_map_[cluster_->tablet_server(second_leader_index)->uuid()];
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(first_leader, 1, kTimeout, &tablets));
+ string tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Wait until all replicas are up and running.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id, kTimeout));
+ }
+
+ // Elect a leader for term 1, then generate some data to copy.
+ ASSERT_OK(StartElection(first_leader, tablet_id, kTimeout));
+
+ workload.Start();
+ inspect_->WaitForMinFilesInTabletWalDirOnTS(first_leader_index, tablet_id, kNumWalSegments);
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+ // Scan rows from cluster with given table name, and store the stringified
+ // rows to result_collector.
+ auto scan_func = [](const decltype(client_)& client,
+ const string& table_name,
+ vector<string>* result_collector) {
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(table_name, &table));
+
+ KuduScanner s(table.get());
+ ASSERT_OK(s.SetSelection(client::KuduClient::LEADER_ONLY));
+ ASSERT_OK(s.SetReadMode(KuduScanner::READ_LATEST));
+ ASSERT_OK(s.Open());
+ KuduScanBatch batch;
+ while (s.HasMoreRows()) {
+ ASSERT_OK(s.NextBatch(&batch));
+ for (const auto& row : batch) {
+ result_collector->emplace_back(row.ToString());
+ }
+ }
+ s.Close();
+ };
+
+ // Scan row from first leader and collect result.
+ vector<string> result_ts0;
+ scan_func(client_, workload.table_name(), &result_ts0);
+
+ ASSERT_EQ(1, result_ts0.size());
+
+ // Delete the tablet from second leader ts and wait until the tablet recovery
+ // from the first leader via TabletCopy.
+ ASSERT_OK(DeleteTablet(second_leader, tablet_id, TABLET_DATA_TOMBSTONED, kTimeout));
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(second_leader_index, tablet_id,
+ { tablet::TABLET_DATA_READY },
+ kTimeout));
+ ASSERT_OK(WaitUntilTabletRunning(second_leader, tablet_id, kTimeout));
+
+ // Make the second leader as leader.
+ ASSERT_OK(StartElection(second_leader, tablet_id, kTimeout));
+
+ // Verify WAL contents are same among tservers of the cluster.
+ {
+ ClusterVerifier v(cluster_.get());
+ v.CheckCluster();
+ }
+
+ // Scan row from second leader and collect result.
+ vector<string> result_ts1;
+ scan_func(client_, workload.table_name(), &result_ts1);
+ ASSERT_EQ(1, result_ts1.size());
+
+ // Check the content should be the same.
+ ASSERT_EQ(result_ts0[0], result_ts1[0]);
+}
+
} // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index efd2940..f50fa52 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -76,6 +76,7 @@ using std::vector;
DECLARE_double(env_inject_eio);
DECLARE_double(tablet_copy_fault_crash_during_download_block);
+DECLARE_double(tablet_copy_fault_crash_during_download_wal);
DECLARE_int32(tablet_copy_download_threads_nums_per_session);
DECLARE_string(block_manager);
DECLARE_string(env_inject_eio_globs);
@@ -297,12 +298,26 @@ TEST_F(TabletCopyClientTest, TestDownloadBlockMayFail) {
FLAGS_tablet_copy_fault_crash_during_download_block = 0.5;
FLAGS_tablet_copy_download_threads_nums_per_session = 16;
+ ASSERT_OK(ResetTabletCopyClient());
ASSERT_OK(StartCopy());
Status s = client_->DownloadBlocks();
ASSERT_TRUE(s.IsIOError());
ASSERT_STR_CONTAINS(s.ToString(), "Injected failure on downloading block");
}
+// Test that error status is properly reported if there was a failure in any
+// of multiple threads downloading tablet's wal segments.
+TEST_F(TabletCopyClientTest, TestDownloadWalMayFail) {
+ FLAGS_tablet_copy_fault_crash_during_download_wal = 1;
+ FLAGS_tablet_copy_download_threads_nums_per_session = 4;
+
+ ASSERT_OK(ResetTabletCopyClient());
+ ASSERT_OK(StartCopy());
+ Status s = client_->DownloadWALs();
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_STR_CONTAINS(s.ToString(), "Injected failure on downloading wal");
+}
+
// Basic WAL segment download unit test.
TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
ASSERT_OK(StartCopy());
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 64c3a0f..dee2afe 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -91,6 +91,24 @@ DEFINE_int32(tablet_copy_download_file_inject_latency_ms, 0,
"to take much longer. For use in tests only.");
TAG_FLAG(tablet_copy_download_file_inject_latency_ms, hidden);
+DEFINE_bool(tablet_copy_download_wal_inject_latency, false,
+ "If true, injects artificial latency in DownloadWAL() operations. "
+ "For use in test only.");
+TAG_FLAG(tablet_copy_download_wal_inject_latency, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency, runtime);
+
+DEFINE_int32(tablet_copy_download_wal_inject_latency_ms_mean, 200,
+ "The number of milliseconds of latency to inject, on average. "
+ "Only take effects if --tablet_copy_download_wal_inject_latency is true");
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_mean, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_mean, runtime);
+
+DEFINE_int32(tablet_copy_download_wal_inject_latency_ms_stddev, 200,
+ "The standard deviation of latency to inject on DownloadWAL()"
+ "Only take effects if --tablet_copy_download_wal_inject_latency is true");
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_stddev, unsafe);
+TAG_FLAG(tablet_copy_download_wal_inject_latency_ms_stddev, runtime);
+
DEFINE_double(tablet_copy_fault_crash_on_fetch_all, 0.0,
"Fraction of the time that the server will crash when FetchAll() "
"is called on the TabletCopyClient. (For testing only!)");
@@ -110,6 +128,12 @@ DEFINE_double(tablet_copy_fault_crash_during_download_block, 0.0,
TAG_FLAG(tablet_copy_fault_crash_during_download_block, unsafe);
TAG_FLAG(tablet_copy_fault_crash_during_download_block, runtime);
+DEFINE_double(tablet_copy_fault_crash_during_download_wal, 0.0,
+ "Fraction of the time that DownloadWal() will fail. "
+ "For use in test only.");
+TAG_FLAG(tablet_copy_fault_crash_during_download_wal, unsafe);
+TAG_FLAG(tablet_copy_fault_crash_during_download_wal, runtime);
+
DEFINE_int32(tablet_copy_download_threads_nums_per_session, 4,
"Number of threads per tablet copy session for downloading tablet data blocks.");
DEFINE_validator(tablet_copy_download_threads_nums_per_session,
@@ -185,10 +209,10 @@ TabletCopyClient::TabletCopyClient(
if (tablet_copy_metrics_) {
tablet_copy_metrics_->open_client_sessions->Increment();
}
- ThreadPoolBuilder("blocks-download-pool-" + tablet_id)
+ ThreadPoolBuilder("tablet-download-pool-" + tablet_id)
.set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
.set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
- .Build(&blocks_download_pool_);
+ .Build(&tablet_download_pool_);
}
TabletCopyClient::~TabletCopyClient() {
@@ -197,7 +221,7 @@ TabletCopyClient::~TabletCopyClient() {
LogPrefix()));
WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after aborted copy",
LogPrefix()));
- blocks_download_pool_->Shutdown();
+ tablet_download_pool_->Shutdown();
if (tablet_copy_metrics_) {
tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
}
@@ -565,17 +589,35 @@ Status TabletCopyClient::DownloadWALs() {
RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
// Download the WAL segments.
- int num_segments = wal_seqnos_.size();
+ const int num_segments = wal_seqnos_.size();
LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
- uint64_t counter = 0;
+
+ atomic<uint64_t> counter(0);
+ Status end_status;
for (uint64_t seg_seqno : wal_seqnos_) {
- SetStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
- seg_seqno, counter + 1, num_segments));
- RETURN_NOT_OK(DownloadWAL(seg_seqno));
- ++counter;
+ RETURN_NOT_OK(tablet_download_pool_->Submit([&, seg_seqno]() {
+ SetStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
+ seg_seqno, counter.load() + 1, num_segments));
+ {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ if (!end_status.ok()) {
+ return;
+ }
+ }
+ Status s = DownloadWAL(seg_seqno);
+ if (!s.ok()) {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ if (end_status.ok()) {
+ end_status = s;
+ }
+ return;
+ }
+ counter++;
+ }));
}
+ tablet_download_pool_->Wait();
- return Status::OK();
+ return end_status;
}
int TabletCopyClient::CountRemoteBlocks() const {
@@ -690,17 +732,32 @@ Status TabletCopyClient::DownloadBlocks() {
// Download rowsets in parallel.
// Each task downloads the blocks of the corresponding rowset sequentially.
for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) {
- RETURN_NOT_OK(blocks_download_pool_->Submit([&]() mutable {
+ RETURN_NOT_OK(tablet_download_pool_->Submit([&]() mutable {
DownloadRowset(src_rowset, num_remote_blocks, &block_count, &end_status);
}));
}
- blocks_download_pool_->Wait();
+ tablet_download_pool_->Wait();
return end_status;
}
Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
+
+ MAYBE_RETURN_FAILURE(FLAGS_tablet_copy_fault_crash_during_download_wal,
+ Status::IOError("Injected failure on downloading wal"));
+ if (PREDICT_FALSE(FLAGS_tablet_copy_download_wal_inject_latency)) {
+ Random r(GetCurrentTimeMicros());
+ int sleep_ms = static_cast<int>(
+ r.Normal(FLAGS_tablet_copy_download_wal_inject_latency_ms_mean,
+ FLAGS_tablet_copy_download_wal_inject_latency_ms_stddev));
+ if (sleep_ms > 0) {
+ LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
+ << "ms of latency in TabletCopyClient::DownloadWAL()";
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ }
+ }
+
RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading WAL for replica");
DataIdPB data_id;
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 3d5395e..3452d6e 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -77,9 +77,6 @@ struct TabletCopyClientMetrics {
// Client class for using tablet copy to copy a tablet from another host.
// This class is not thread-safe.
//
-// TODO:
-// * Parallelize download of blocks and WAL segments.
-//
class TabletCopyClient {
public:
@@ -140,6 +137,7 @@ class TabletCopyClient {
FRIEND_TEST(TabletCopyClientTest, TestLifeCycle);
FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
FRIEND_TEST(TabletCopyClientTest, TestDownloadBlockMayFail);
+ FRIEND_TEST(TabletCopyClientTest, TestDownloadWalMayFail);
FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
FRIEND_TEST(TabletCopyClientAbortTest, TestAbort);
@@ -183,7 +181,7 @@ class TabletCopyClient {
// End the tablet copy session.
Status EndRemoteSession();
- // Download all WAL files sequentially.
+ // Download all WAL files in parallel.
Status DownloadWALs();
// Download a single WAL file.
@@ -256,7 +254,7 @@ class TabletCopyClient {
template<class Appendable>
Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
- Status VerifyData(uint64_t offset, const DataChunkPB& resp);
+ Status VerifyData(uint64_t offset, const DataChunkPB& chunk);
// Runs the provided functor, which must send an RPC and return the result
// status, until it succeeds, times out, or fails with a non-retriable error.
@@ -303,11 +301,11 @@ class TabletCopyClient {
// Block transaction for the tablet copy.
std::unique_ptr<fs::BlockCreationTransaction> transaction_;
- // Thread pool for downloading all data blocks in parallel.
- std::unique_ptr<ThreadPool> blocks_download_pool_;
+ // Thread pool for downloading all data blocks and wals in parallel.
+ std::unique_ptr<ThreadPool> tablet_download_pool_;
// Protects adding/creating blocks, adding a rowset,
- // reading/updating rowset download status.
+ // reading/updating rowset/wal download status.
simple_spinlock simple_lock_;
DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);