You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/07 03:56:52 UTC

[4/5] kudu git commit: Rename remote bootstrap files to 'tablet copy'

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap.proto b/src/kudu/tserver/remote_bootstrap.proto
deleted file mode 100644
index 1e89919..0000000
--- a/src/kudu/tserver/remote_bootstrap.proto
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package kudu.tserver;
-
-option java_package = "org.apache.kudu.tserver";
-
-import "kudu/common/wire_protocol.proto";
-import "kudu/consensus/metadata.proto";
-import "kudu/fs/fs.proto";
-import "kudu/rpc/rpc_header.proto";
-import "kudu/tablet/metadata.proto";
-
-// RaftConfig tablet copy RPC calls.
-service TabletCopyService {
-  // Establish a tablet copy session.
-  rpc BeginTabletCopySession(BeginTabletCopySessionRequestPB)
-      returns (BeginTabletCopySessionResponsePB);
-
-  // Check whether the specified session is active.
-  rpc CheckSessionActive(CheckTabletCopySessionActiveRequestPB)
-      returns (CheckTabletCopySessionActiveResponsePB);
-
-  // Fetch data (blocks, logs) from the server.
-  rpc FetchData(FetchDataRequestPB)
-      returns (FetchDataResponsePB);
-
-  // End a tablet copy session, allow server to release resources.
-  rpc EndTabletCopySession(EndTabletCopySessionRequestPB)
-      returns (EndTabletCopySessionResponsePB);
-}
-
-// Tablet Copy-specific errors use this protobuf.
-message TabletCopyErrorPB {
-  extend kudu.rpc.ErrorStatusPB {
-    optional TabletCopyErrorPB tablet_copy_error_ext = 102;
-  }
-
-  enum Code {
-    // An error which has no more specific error code.
-    // The code and message in 'status' may reveal more details.
-    //
-    // RPCs should avoid returning this, since callers will not be
-    // able to easily parse the error.
-    UNKNOWN_ERROR = 1;
-
-    // The specified tablet copy session either never existed or has expired.
-    NO_SESSION = 2;
-
-    // Unknown tablet.
-    TABLET_NOT_FOUND = 3;
-
-    // Unknown data block.
-    BLOCK_NOT_FOUND = 4;
-
-    // Unknown WAL segment.
-    WAL_SEGMENT_NOT_FOUND = 5;
-
-    // Invalid request. Possibly missing parameters.
-    INVALID_TABLET_COPY_REQUEST = 6;
-
-    // Error reading or transferring data.
-    IO_ERROR = 7;
-  }
-
-  // The error code.
-  required Code code = 1 [ default = UNKNOWN_ERROR ];
-
-  // The Status object for the error. This will include a textual
-  // message that may be more useful to present in log messages, etc,
-  // though its error code is less specific.
-  required AppStatusPB status = 2;
-}
-
-message BeginTabletCopySessionRequestPB {
-  // permanent_uuid of the requesting peer.
-  required bytes requestor_uuid = 1;
-
-  // tablet_id of the tablet the requester desires to bootstrap from.
-  required bytes tablet_id = 2;
-}
-
-message BeginTabletCopySessionResponsePB {
-  // Opaque session id assigned by the server.
-  // No guarantees are made as to the format of the session id.
-  required bytes session_id = 1;
-
-  // Maximum session idle timeout between requests.
-  // Learners will have to start over again if they reach this timeout.
-  // A value of 0 means there is no timeout.
-  required uint64 session_idle_timeout_millis = 2;
-
-  // Active superblock at the time of the request.
-  required tablet.TabletSuperBlockPB superblock = 3;
-
-  // Identifiers for the WAL segments available for download.
-  // Each WAL segment is keyed by its sequence number.
-  repeated uint64 wal_segment_seqnos = 4;
-
-  // A snapshot of the committed Consensus state at the time that the
-  // tablet copy session was started.
-  required consensus.ConsensusStatePB initial_committed_cstate = 5;
-
-  // permanent_uuid of the responding peer.
-  optional bytes responder_uuid = 6;
-}
-
-message CheckTabletCopySessionActiveRequestPB {
-  // Valid Session ID returned by a BeginTabletCopySession() RPC call.
-  required bytes session_id = 1;
-
-  // Set keepalive to true to reset the session timeout timer.
-  optional bool keepalive = 2 [default = false];
-}
-
-message CheckTabletCopySessionActiveResponsePB {
-  // Whether the given session id represents an active tablet copy session.
-  required bool session_is_active = 1;
-}
-
-// A "union" type that allows the same RPC call to fetch different types of
-// data (data blocks or log files).
-message DataIdPB {
-  enum IdType {
-    UNKNOWN = 0;
-    BLOCK = 1;
-    LOG_SEGMENT = 2;
-  }
-
-  // Indicator whether it's a block or log segment id.
-  required IdType type = 1;
-
-  // Exactly one of these must be set.
-  optional BlockIdPB block_id = 2;          // To fetch a block.
-  optional uint64 wal_segment_seqno = 3;    // To fetch a log segment.
-}
-
-message FetchDataRequestPB {
-  // Valid Session ID returned by a BeginTabletCopySession() RPC call.
-  required bytes session_id = 1;
-
-  // The server will use this ID to determine the key and type of data
-  // that was requested.
-  required DataIdPB data_id = 2;
-
-  // Offset into data to start reading from.
-  // If not specified, the server will send the data from offset 0.
-  optional uint64 offset = 3 [default = 0];
-
-  // Maximum length of the chunk of data to return.
-  // If max_length is not specified, or if the server's max is less than the
-  // requested max, the server will use its own max.
-  optional int64 max_length = 4 [default = 0];
-}
-
-// A chunk of data (a slice of a block, file, etc).
-message DataChunkPB {
-  // Offset into the complete data block or file that 'data' starts at.
-  required uint64 offset = 1;
-
-  // Actual bytes of data from the data block, starting at 'offset'.
-  required bytes data = 2;
-
-  // CRC32C of the bytes contained in 'data'.
-  required fixed32 crc32 = 3;
-
-  // Full length, in bytes, of the complete data block or file on the server.
-  // The number of bytes returned in 'data' can certainly be less than this.
-  required int64 total_data_length = 4;
-}
-
-message FetchDataResponsePB {
-  // The server will automatically release the resources (i.e. close file, free
-  // read buffers) for a given data resource after the last byte is read.
-  // So, per-resource, chunks are optimized to be fetched in-order.
-  required DataChunkPB chunk = 1;
-}
-
-message EndTabletCopySessionRequestPB {
-  required bytes session_id = 1;
-
-  // Set to true if bootstrap is successful.
-  required bool is_success = 2;
-
-  // Client-provided error message. The server will log this error so that an
-  // admin can identify when bad things are happening with tablet copy.
-  optional AppStatusPB error = 3;
-}
-
-message EndTabletCopySessionResponsePB {
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client-test.cc b/src/kudu/tserver/remote_bootstrap_client-test.cc
deleted file mode 100644
index d1bfc16..0000000
--- a/src/kudu/tserver/remote_bootstrap_client-test.cc
+++ /dev/null
@@ -1,241 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap-test-base.h"
-
-#include "kudu/consensus/quorum_util.h"
-#include "kudu/gutil/strings/fastmem.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tserver/remote_bootstrap_client.h"
-#include "kudu/util/env_util.h"
-
-using std::shared_ptr;
-
-namespace kudu {
-namespace tserver {
-
-using consensus::GetRaftConfigLeader;
-using consensus::RaftPeerPB;
-using tablet::TabletMetadata;
-using tablet::TabletStatusListener;
-
-class TabletCopyClientTest : public TabletCopyTest {
- public:
-  virtual void SetUp() OVERRIDE {
-    NO_FATALS(TabletCopyTest::SetUp());
-
-    fs_manager_.reset(new FsManager(Env::Default(), GetTestPath("client_tablet")));
-    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
-    ASSERT_OK(fs_manager_->Open());
-
-    tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
-    rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
-    client_.reset(new TabletCopyClient(GetTabletId(),
-                                            fs_manager_.get(),
-                                            messenger_));
-    ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus()
-        ->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_));
-
-    HostPort host_port;
-    ASSERT_OK(HostPortFromPB(leader_.last_known_addr(), &host_port));
-    ASSERT_OK(client_->Start(host_port, &meta_));
-  }
-
- protected:
-  Status CompareFileContents(const string& path1, const string& path2);
-
-  gscoped_ptr<FsManager> fs_manager_;
-  shared_ptr<rpc::Messenger> messenger_;
-  gscoped_ptr<TabletCopyClient> client_;
-  scoped_refptr<TabletMetadata> meta_;
-  RaftPeerPB leader_;
-};
-
-Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) {
-  shared_ptr<RandomAccessFile> file1, file2;
-  RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path1, &file1));
-  RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path2, &file2));
-
-  uint64_t size1, size2;
-  RETURN_NOT_OK(file1->Size(&size1));
-  RETURN_NOT_OK(file2->Size(&size2));
-  if (size1 != size2) {
-    return Status::Corruption("Sizes of files don't match",
-                              strings::Substitute("$0 vs $1 bytes", size1, size2));
-  }
-
-  Slice slice1, slice2;
-  faststring scratch1, scratch2;
-  scratch1.resize(size1);
-  scratch2.resize(size2);
-  RETURN_NOT_OK(env_util::ReadFully(file1.get(), 0, size1, &slice1, scratch1.data()));
-  RETURN_NOT_OK(env_util::ReadFully(file2.get(), 0, size2, &slice2, scratch2.data()));
-  int result = strings::fastmemcmp_inlined(slice1.data(), slice2.data(), size1);
-  if (result != 0) {
-    return Status::Corruption("Files do not match");
-  }
-  return Status::OK();
-}
-
-// Basic begin / end tablet copy session.
-TEST_F(TabletCopyClientTest, TestBeginEndSession) {
-  TabletStatusListener listener(meta_);
-  ASSERT_OK(client_->FetchAll(&listener));
-  ASSERT_OK(client_->Finish());
-}
-
-// Basic data block download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadBlock) {
-  TabletStatusListener listener(meta_);
-  BlockId block_id = FirstColumnBlockId(*client_->superblock_);
-  Slice slice;
-  faststring scratch;
-
-  // Ensure the block wasn't there before (it shouldn't be, we use our own FsManager dir).
-  Status s;
-  s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
-  ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
-
-  // Check that the client downloaded the block and verification passed.
-  BlockId new_block_id;
-  ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));
-
-  // Ensure it placed the block where we expected it to.
-  s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
-  ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
-  ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
-}
-
-// Basic WAL segment download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
-  ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId())));
-
-  uint64_t seqno = client_->wal_seqnos_[0];
-  string path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno);
-
-  ASSERT_FALSE(fs_manager_->Exists(path));
-  ASSERT_OK(client_->DownloadWAL(seqno));
-  ASSERT_TRUE(fs_manager_->Exists(path));
-
-  log::SegmentSequence local_segments;
-  ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
-  const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0];
-  string server_path = segment->path();
-
-  // Compare the downloaded file with the source file.
-  ASSERT_OK(CompareFileContents(path, server_path));
-}
-
-// Ensure that we detect data corruption at the per-transfer level.
-TEST_F(TabletCopyClientTest, TestVerifyData) {
-  string good = "This is a known good string";
-  string bad = "This is a known bad! string";
-  const int kGoodOffset = 0;
-  const int kBadOffset = 1;
-  const int64_t kDataTotalLen = std::numeric_limits<int64_t>::max(); // Ignored.
-
-  // Create a known-good PB.
-  DataChunkPB valid_chunk;
-  valid_chunk.set_offset(0);
-  valid_chunk.set_data(good);
-  valid_chunk.set_crc32(crc::Crc32c(good.data(), good.length()));
-  valid_chunk.set_total_data_length(kDataTotalLen);
-
-  // Make sure we work on the happy case.
-  ASSERT_OK(client_->VerifyData(kGoodOffset, valid_chunk));
-
-  // Test unexpected offset.
-  DataChunkPB bad_offset = valid_chunk;
-  bad_offset.set_offset(kBadOffset);
-  Status s;
-  s = client_->VerifyData(kGoodOffset, bad_offset);
-  ASSERT_TRUE(s.IsInvalidArgument()) << "Bad offset expected: " << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "Offset did not match");
-  LOG(INFO) << "Expected error returned: " << s.ToString();
-
-  // Test bad checksum.
-  DataChunkPB bad_checksum = valid_chunk;
-  bad_checksum.set_data(bad);
-  s = client_->VerifyData(kGoodOffset, bad_checksum);
-  ASSERT_TRUE(s.IsCorruption()) << "Invalid checksum expected: " << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "CRC32 does not match");
-  LOG(INFO) << "Expected error returned: " << s.ToString();
-}
-
-namespace {
-
-vector<BlockId> GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) {
-  vector<BlockId> data_blocks;
-
-  for (const tablet::RowSetDataPB& rowset : sb.rowsets()) {
-    for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) {
-      data_blocks.push_back(BlockId::FromPB(redo.block()));
-    }
-    for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) {
-      data_blocks.push_back(BlockId::FromPB(undo.block()));
-    }
-    for (const tablet::ColumnDataPB& column : rowset.columns()) {
-      data_blocks.push_back(BlockId::FromPB(column.block()));
-    }
-    if (rowset.has_bloom_block()) {
-      data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
-    }
-    if (rowset.has_adhoc_index_block()) {
-      data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
-    }
-  }
-
-  std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare());
-  return data_blocks;
-}
-
-} // anonymous namespace
-
-TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
-  // Download all the blocks.
-  ASSERT_OK(client_->DownloadBlocks());
-
-  // Verify that the new superblock reflects the changes in block IDs.
-  //
-  // As long as block IDs are generated with UUIDs or something equally
-  // unique, there's no danger of a block in the new superblock somehow
-  // being assigned the same ID as a block in the existing superblock.
-  vector<BlockId> old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get());
-  vector<BlockId> new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get());
-  vector<BlockId> result;
-  std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(),
-                        new_data_blocks.begin(), new_data_blocks.end(),
-                        std::back_inserter(result), BlockIdCompare());
-  ASSERT_TRUE(result.empty());
-  ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size());
-
-  // Verify that the old blocks aren't found. We're using a different
-  // FsManager than 'tablet_peer', so the only way an old block could end
-  // up in ours is due to a tablet copy client bug.
-  for (const BlockId& block_id : old_data_blocks) {
-    gscoped_ptr<fs::ReadableBlock> block;
-    Status s = fs_manager_->OpenBlock(block_id, &block);
-    ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
-  }
-  // And the new blocks are all present.
-  for (const BlockId& block_id : new_data_blocks) {
-    gscoped_ptr<fs::ReadableBlock> block;
-    ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
-  }
-}
-
-} // namespace tserver
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client.cc b/src/kudu/tserver/remote_bootstrap_client.cc
deleted file mode 100644
index a7cdde1..0000000
--- a/src/kudu/tserver/remote_bootstrap_client.cc
+++ /dev/null
@@ -1,563 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tserver/remote_bootstrap_client.h"
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/fs/block_id.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/gutil/walltime.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/transfer.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/tserver/remote_bootstrap.proxy.h"
-#include "kudu/tserver/tablet_server.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/logging.h"
-#include "kudu/util/net/net_util.h"
-
-DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
-             "Tablet server RPC client timeout for BeginTabletCopySession calls. "
-             "Also used for EndTabletCopySession calls.");
-TAG_FLAG(tablet_copy_begin_session_timeout_ms, hidden);
-
-DEFINE_bool(tablet_copy_save_downloaded_metadata, false,
-            "Save copies of the downloaded tablet copy files for debugging purposes. "
-            "Note: This is only intended for debugging and should not be normally used!");
-TAG_FLAG(tablet_copy_save_downloaded_metadata, advanced);
-TAG_FLAG(tablet_copy_save_downloaded_metadata, hidden);
-TAG_FLAG(tablet_copy_save_downloaded_metadata, runtime);
-
-DEFINE_int32(tablet_copy_dowload_file_inject_latency_ms, 0,
-             "Injects latency into the loop that downloads files, causing tablet copy "
-             "to take much longer. For use in tests only.");
-TAG_FLAG(tablet_copy_dowload_file_inject_latency_ms, hidden);
-
-DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
-
-// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
-#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
-  RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
-
-namespace kudu {
-namespace tserver {
-
-using consensus::ConsensusMetadata;
-using consensus::ConsensusStatePB;
-using consensus::OpId;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using env_util::CopyFile;
-using fs::WritableBlock;
-using rpc::Messenger;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-using strings::Substitute;
-using tablet::ColumnDataPB;
-using tablet::DeltaDataPB;
-using tablet::RowSetDataPB;
-using tablet::TabletDataState;
-using tablet::TabletDataState_Name;
-using tablet::TabletMetadata;
-using tablet::TabletStatusListener;
-using tablet::TabletSuperBlockPB;
-
-TabletCopyClient::TabletCopyClient(std::string tablet_id,
-                                             FsManager* fs_manager,
-                                             shared_ptr<Messenger> messenger)
-    : tablet_id_(std::move(tablet_id)),
-      fs_manager_(fs_manager),
-      messenger_(std::move(messenger)),
-      started_(false),
-      downloaded_wal_(false),
-      downloaded_blocks_(false),
-      replace_tombstoned_tablet_(false),
-      status_listener_(nullptr),
-      session_idle_timeout_millis_(0),
-      start_time_micros_(0) {}
-
-TabletCopyClient::~TabletCopyClient() {
-  // Note: Ending the tablet copy session releases anchors on the remote.
-  WARN_NOT_OK(EndRemoteSession(), "Unable to close tablet copy session");
-}
-
-Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>& meta,
-                                                 int64_t caller_term) {
-  CHECK_EQ(tablet_id_, meta->tablet_id());
-  TabletDataState data_state = meta->tablet_data_state();
-  if (data_state != tablet::TABLET_DATA_TOMBSTONED) {
-    return Status::IllegalState(Substitute("Tablet $0 not in tombstoned state: $1 ($2)",
-                                           tablet_id_,
-                                           TabletDataState_Name(data_state),
-                                           data_state));
-  }
-
-  replace_tombstoned_tablet_ = true;
-  meta_ = meta;
-
-  int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
-  if (last_logged_term > caller_term) {
-    return Status::InvalidArgument(
-        Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
-                   "for tablet $1 has higher term $2. Refusing tablet copy from leader",
-                   caller_term, tablet_id_, last_logged_term));
-  }
-
-  // Load the old consensus metadata, if it exists.
-  gscoped_ptr<ConsensusMetadata> cmeta;
-  Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
-                                     fs_manager_->uuid(), &cmeta);
-  if (s.IsNotFound()) {
-    // The consensus metadata was not written to disk, possibly due to a failed
-    // tablet copy.
-    return Status::OK();
-  }
-  RETURN_NOT_OK(s);
-  cmeta_.swap(cmeta);
-  return Status::OK();
-}
-
-Status TabletCopyClient::Start(const HostPort& copy_source_addr,
-                                    scoped_refptr<TabletMetadata>* meta) {
-  CHECK(!started_);
-  start_time_micros_ = GetCurrentTimeMicros();
-
-  Sockaddr addr;
-  RETURN_NOT_OK(SockaddrFromHostPort(copy_source_addr, &addr));
-  if (addr.IsWildcard()) {
-    return Status::InvalidArgument("Invalid wildcard address to tablet copy from",
-                                   Substitute("$0 (resolved to $1)",
-                                              copy_source_addr.host(), addr.host()));
-  }
-  LOG_WITH_PREFIX(INFO) << "Beginning tablet copy session"
-                        << " from remote peer at address " << copy_source_addr.ToString();
-
-  // Set up an RPC proxy for the TabletCopyService.
-  proxy_.reset(new TabletCopyServiceProxy(messenger_, addr));
-
-  BeginTabletCopySessionRequestPB req;
-  req.set_requestor_uuid(fs_manager_->uuid());
-  req.set_tablet_id(tablet_id_);
-
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(
-      FLAGS_tablet_copy_begin_session_timeout_ms));
-
-  // Begin the tablet copy session with the remote peer.
-  BeginTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Unable to begin tablet copy session");
-  string copy_peer_uuid = resp.has_responder_uuid()
-      ? resp.responder_uuid() : "(unknown uuid)";
-  if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
-    Status s = Status::IllegalState("Remote peer (" + copy_peer_uuid + ")" +
-                                    " is currently copying itself!",
-                                    resp.superblock().ShortDebugString());
-    LOG_WITH_PREFIX(WARNING) << s.ToString();
-    return s;
-  }
-
-  session_id_ = resp.session_id();
-  session_idle_timeout_millis_ = resp.session_idle_timeout_millis();
-  superblock_.reset(resp.release_superblock());
-  superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING);
-  wal_seqnos_.assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
-  remote_committed_cstate_.reset(resp.release_initial_committed_cstate());
-
-  Schema schema;
-  RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
-                        "Cannot deserialize schema from remote superblock");
-
-  if (replace_tombstoned_tablet_) {
-    // Also validate the term of the source peer, in case they are
-    // different. This is a sanity check that protects us in case a bug or
-    // misconfiguration causes us to attempt to copy from an out-of-date
-    // source peer, even after passing the term check from the caller in
-    // SetTabletToReplace().
-    int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
-    if (last_logged_term > remote_committed_cstate_->current_term()) {
-      return Status::InvalidArgument(
-          Substitute("Tablet $0: source peer has term $1 but "
-                     "tombstoned replica has last-logged opid with higher term $2. "
-                      "Refusing tablet copy from source peer $3",
-                      tablet_id_,
-                      remote_committed_cstate_->current_term(),
-                      last_logged_term,
-                      copy_peer_uuid));
-    }
-
-    // This will flush to disk, but we set the data state to COPYING above.
-    RETURN_NOT_OK_PREPEND(meta_->ReplaceSuperBlock(*superblock_),
-                          "Tablet Copy unable to replace superblock on tablet " +
-                          tablet_id_);
-  } else {
-
-    Partition partition;
-    Partition::FromPB(superblock_->partition(), &partition);
-    PartitionSchema partition_schema;
-    RETURN_NOT_OK(PartitionSchema::FromPB(superblock_->partition_schema(),
-                                          schema, &partition_schema));
-
-    // Create the superblock on disk.
-    RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_,
-                                            superblock_->table_name(),
-                                            superblock_->table_id(),
-                                            schema,
-                                            partition_schema,
-                                            partition,
-                                            tablet::TABLET_DATA_COPYING,
-                                            &meta_));
-  }
-
-  started_ = true;
-  if (meta) {
-    *meta = meta_;
-  }
-  return Status::OK();
-}
-
-Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) {
-  CHECK(started_);
-  status_listener_ = status_listener;
-
-  // Download all the files (serially, for now, but in parallel in the future).
-  RETURN_NOT_OK(DownloadBlocks());
-  RETURN_NOT_OK(DownloadWALs());
-
-  return Status::OK();
-}
-
-Status TabletCopyClient::Finish() {
-  CHECK(meta_);
-  CHECK(started_);
-  CHECK(downloaded_wal_);
-  CHECK(downloaded_blocks_);
-
-  RETURN_NOT_OK(WriteConsensusMetadata());
-
-  // Replace tablet metadata superblock. This will set the tablet metadata state
-  // to TABLET_DATA_READY, since we checked above that the response
-  // superblock is in a valid state to bootstrap from.
-  LOG_WITH_PREFIX(INFO) << "Tablet Copy complete. Replacing tablet superblock.";
-  UpdateStatusMessage("Replacing tablet superblock");
-  new_superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY);
-  RETURN_NOT_OK(meta_->ReplaceSuperBlock(*new_superblock_));
-
-  if (FLAGS_tablet_copy_save_downloaded_metadata) {
-    string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
-    string meta_copy_path = Substitute("$0.copy.$1.tmp", meta_path, start_time_micros_);
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path,
-                                   WritableFileOptions()),
-                          "Unable to make copy of tablet metadata");
-  }
-
-  return Status::OK();
-}
-
-// Decode the remote error into a human-readable Status object.
-Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_error) {
-  if (PREDICT_TRUE(remote_error.HasExtension(TabletCopyErrorPB::tablet_copy_error_ext))) {
-    const TabletCopyErrorPB& error =
-        remote_error.GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
-    return StatusFromPB(error.status()).CloneAndPrepend("Received error code " +
-              TabletCopyErrorPB::Code_Name(error.code()) + " from remote service");
-  } else {
-    return Status::InvalidArgument("Unable to decode tablet copy RPC error message",
-                                   remote_error.ShortDebugString());
-  }
-}
-
-// Enhance a RemoteError Status message with additional details from the remote.
-Status TabletCopyClient::UnwindRemoteError(const Status& status,
-                                                const rpc::RpcController& controller) {
-  if (!status.IsRemoteError()) {
-    return status;
-  }
-  Status extension_status = ExtractRemoteError(*controller.error_response());
-  return status.CloneAndAppend(extension_status.ToString());
-}
-
-void TabletCopyClient::UpdateStatusMessage(const string& message) {
-  if (status_listener_ != nullptr) {
-    status_listener_->StatusMessage("TabletCopy: " + message);
-  }
-}
-
-Status TabletCopyClient::EndRemoteSession() {
-  if (!started_) {
-    return Status::OK();
-  }
-
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(
-        FLAGS_tablet_copy_begin_session_timeout_ms));
-
-  EndTabletCopySessionRequestPB req;
-  req.set_session_id(session_id_);
-  req.set_is_success(true);
-  EndTabletCopySessionResponsePB resp;
-  RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
-                               controller,
-                               "Failure ending tablet copy session");
-
-  return Status::OK();
-}
-
-Status TabletCopyClient::DownloadWALs() {
-  CHECK(started_);
-
-  // Delete and recreate WAL dir if it already exists, to ensure stray files are
-  // not kept from previous copies and runs.
-  string path = fs_manager_->GetTabletWalDir(tablet_id_);
-  if (fs_manager_->env()->FileExists(path)) {
-    RETURN_NOT_OK(fs_manager_->env()->DeleteRecursively(path));
-  }
-  RETURN_NOT_OK(fs_manager_->env()->CreateDir(path));
-  RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
-
-  // Download the WAL segments.
-  int num_segments = wal_seqnos_.size();
-  LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
-  uint64_t counter = 0;
-  for (uint64_t seg_seqno : wal_seqnos_) {
-    UpdateStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
-                                   seg_seqno, counter + 1, num_segments));
-    RETURN_NOT_OK(DownloadWAL(seg_seqno));
-    ++counter;
-  }
-
-  downloaded_wal_ = true;
-  return Status::OK();
-}
-
-Status TabletCopyClient::DownloadBlocks() {
-  CHECK(started_);
-
-  // Count up the total number of blocks to download.
-  int num_blocks = 0;
-  for (const RowSetDataPB& rowset : superblock_->rowsets()) {
-    num_blocks += rowset.columns_size();
-    num_blocks += rowset.redo_deltas_size();
-    num_blocks += rowset.undo_deltas_size();
-    if (rowset.has_bloom_block()) {
-      num_blocks++;
-    }
-    if (rowset.has_adhoc_index_block()) {
-      num_blocks++;
-    }
-  }
-
-  // Download each block, writing the new block IDs into the new superblock
-  // as each block downloads.
-  gscoped_ptr<TabletSuperBlockPB> new_sb(new TabletSuperBlockPB());
-  new_sb->CopyFrom(*superblock_);
-  int block_count = 0;
-  LOG_WITH_PREFIX(INFO) << "Starting download of " << num_blocks << " data blocks...";
-  for (RowSetDataPB& rowset : *new_sb->mutable_rowsets()) {
-    for (ColumnDataPB& col : *rowset.mutable_columns()) {
-      RETURN_NOT_OK(DownloadAndRewriteBlock(col.mutable_block(),
-                                            &block_count, num_blocks));
-    }
-    for (DeltaDataPB& redo : *rowset.mutable_redo_deltas()) {
-      RETURN_NOT_OK(DownloadAndRewriteBlock(redo.mutable_block(),
-                                            &block_count, num_blocks));
-    }
-    for (DeltaDataPB& undo : *rowset.mutable_undo_deltas()) {
-      RETURN_NOT_OK(DownloadAndRewriteBlock(undo.mutable_block(),
-                                            &block_count, num_blocks));
-    }
-    if (rowset.has_bloom_block()) {
-      RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_bloom_block(),
-                                            &block_count, num_blocks));
-    }
-    if (rowset.has_adhoc_index_block()) {
-      RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_adhoc_index_block(),
-                                            &block_count, num_blocks));
-    }
-  }
-
-  // The orphaned physical block ids at the remote have no meaning to us.
-  new_sb->clear_orphaned_blocks();
-  new_superblock_.swap(new_sb);
-
-  downloaded_blocks_ = true;
-
-  return Status::OK();
-}
-
-Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
-  VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
-  DataIdPB data_id;
-  data_id.set_type(DataIdPB::LOG_SEGMENT);
-  data_id.set_wal_segment_seqno(wal_segment_seqno);
-  string dest_path = fs_manager_->GetWalSegmentFileName(tablet_id_, wal_segment_seqno);
-
-  WritableFileOptions opts;
-  opts.sync_on_close = true;
-  gscoped_ptr<WritableFile> writer;
-  RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, &writer),
-                        "Unable to open file for writing");
-  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
-                        Substitute("Unable to download WAL segment with seq. number $0",
-                                   wal_segment_seqno));
-  return Status::OK();
-}
-
-Status TabletCopyClient::WriteConsensusMetadata() {
-  // If we didn't find a previous consensus meta file, create one.
-  if (!cmeta_) {
-    gscoped_ptr<ConsensusMetadata> cmeta;
-    return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
-                                     remote_committed_cstate_->config(),
-                                     remote_committed_cstate_->current_term(),
-                                     &cmeta);
-  }
-
-  // Otherwise, update the consensus metadata to reflect the config and term
-  // sent by the tablet copy source.
-  cmeta_->MergeCommittedConsensusStatePB(*remote_committed_cstate_);
-  RETURN_NOT_OK(cmeta_->Flush());
-
-  if (FLAGS_tablet_copy_save_downloaded_metadata) {
-    string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
-    string cmeta_copy_path = Substitute("$0.copy.$1.tmp", cmeta_path, start_time_micros_);
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, cmeta_copy_path,
-                                   WritableFileOptions()),
-                          "Unable to make copy of consensus metadata");
-  }
-
-  return Status::OK();
-}
-
-Status TabletCopyClient::DownloadAndRewriteBlock(BlockIdPB* block_id,
-                                                      int* block_count, int num_blocks) {
-  BlockId old_block_id(BlockId::FromPB(*block_id));
-  UpdateStatusMessage(Substitute("Downloading block $0 ($1/$2)",
-                                 old_block_id.ToString(), *block_count,
-                                 num_blocks));
-  BlockId new_block_id;
-  RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
-      "Unable to download block with id " + old_block_id.ToString());
-
-  new_block_id.CopyToPB(block_id);
-  (*block_count)++;
-  return Status::OK();
-}
-
-Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
-                                            BlockId* new_block_id) {
-  VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
-
-  gscoped_ptr<WritableBlock> block;
-  RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block),
-                        "Unable to create new block");
-
-  DataIdPB data_id;
-  data_id.set_type(DataIdPB::BLOCK);
-  old_block_id.CopyToPB(data_id.mutable_block_id());
-  RETURN_NOT_OK_PREPEND(DownloadFile(data_id, block.get()),
-                        Substitute("Unable to download block $0",
-                                   old_block_id.ToString()));
-
-  *new_block_id = block->id();
-  RETURN_NOT_OK_PREPEND(block->Close(), "Unable to close block");
-  return Status::OK();
-}
-
-template<class Appendable>
-Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
-                                           Appendable* appendable) {
-  uint64_t offset = 0;
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
-  FetchDataRequestPB req;
-
-  bool done = false;
-  while (!done) {
-    controller.Reset();
-    req.set_session_id(session_id_);
-    req.mutable_data_id()->CopyFrom(data_id);
-    req.set_offset(offset);
-    req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
-
-    FetchDataResponsePB resp;
-    RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
-                                controller,
-                                "Unable to fetch data from remote");
-
-    // Sanity-check for corruption.
-    RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
-                          Substitute("Error validating data item $0", data_id.ShortDebugString()));
-
-    // Write the data.
-    RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
-
-    if (PREDICT_FALSE(FLAGS_tablet_copy_dowload_file_inject_latency_ms > 0)) {
-      LOG_WITH_PREFIX(INFO) << "Injecting latency into file download: " <<
-          FLAGS_tablet_copy_dowload_file_inject_latency_ms;
-      SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_dowload_file_inject_latency_ms));
-    }
-
-    if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
-      done = true;
-    }
-    offset += resp.chunk().data().size();
-  }
-
-  return Status::OK();
-}
-
-Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
-  // Verify the offset is what we expected.
-  if (offset != chunk.offset()) {
-    return Status::InvalidArgument("Offset did not match what was asked for",
-        Substitute("$0 vs $1", offset, chunk.offset()));
-  }
-
-  // Verify the checksum.
-  uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
-  if (PREDICT_FALSE(crc32 != chunk.crc32())) {
-    return Status::Corruption(
-        Substitute("CRC32 does not match at offset $0 size $1: $2 vs $3",
-          offset, chunk.data().size(), crc32, chunk.crc32()));
-  }
-  return Status::OK();
-}
-
-string TabletCopyClient::LogPrefix() {
-  return Substitute("T $0 P $1: Tablet Copy client: ",
-                    tablet_id_, fs_manager_->uuid());
-}
-
-} // namespace tserver
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client.h b/src/kudu/tserver/remote_bootstrap_client.h
deleted file mode 100644
index 01253ee..0000000
--- a/src/kudu/tserver/remote_bootstrap_client.h
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_CLIENT_H
-#define KUDU_TSERVER_TABLET_COPY_CLIENT_H
-
-#include <string>
-#include <memory>
-#include <vector>
-
-#include <gtest/gtest_prod.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-
-class BlockId;
-class BlockIdPB;
-class FsManager;
-class HostPort;
-
-namespace consensus {
-class ConsensusMetadata;
-class ConsensusStatePB;
-class RaftConfigPB;
-class RaftPeerPB;
-} // namespace consensus
-
-namespace rpc {
-class ErrorStatusPB;
-class Messenger;
-class RpcController;
-} // namespace rpc
-
-namespace tablet {
-class TabletMetadata;
-class TabletPeer;
-class TabletStatusListener;
-class TabletSuperBlockPB;
-} // namespace tablet
-
-namespace tserver {
-class DataIdPB;
-class DataChunkPB;
-class TabletCopyServiceProxy;
-
-// Client class for using tablet copy to copy a tablet from another host.
-// This class is not thread-safe.
-//
-// TODO:
-// * Parallelize download of blocks and WAL segments.
-//
-class TabletCopyClient {
- public:
-
-  // Construct the tablet copy client.
-  // 'fs_manager' and 'messenger' must remain valid until this object is destroyed.
-  TabletCopyClient(std::string tablet_id, FsManager* fs_manager,
-                        std::shared_ptr<rpc::Messenger> messenger);
-
-  // Attempt to clean up resources on the remote end by sending an
-  // EndTabletCopySession() RPC
-  ~TabletCopyClient();
-
-  // Pass in the existing metadata for a tombstoned tablet, which will be
-  // replaced if validation checks pass in Start().
-  // 'meta' is the metadata for the tombstoned tablet and 'caller_term' is the
-  // term provided by the caller (assumed to be the current leader of the
-  // consensus config) for validation purposes.
-  // If the consensus metadata exists on disk for this tablet, and if
-  // 'caller_term' is lower than the current term stored in that consensus
-  // metadata, then this method will fail with a Status::InvalidArgument error.
-  Status SetTabletToReplace(const scoped_refptr<tablet::TabletMetadata>& meta,
-                            int64_t caller_term);
-
-  // Start up a tablet copy session to bootstrap from the specified
-  // bootstrap peer. Place a new superblock indicating that tablet copy is
-  // in progress. If the 'metadata' pointer is passed as NULL, it is ignored,
-  // otherwise the TabletMetadata object resulting from the initial remote
-  // bootstrap response is returned.
-  Status Start(const HostPort& copy_source_addr,
-               scoped_refptr<tablet::TabletMetadata>* metadata);
-
-  // Runs a "full" tablet copy, copying the physical layout of a tablet
-  // from the leader of the specified consensus configuration.
-  Status FetchAll(tablet::TabletStatusListener* status_listener);
-
-  // After downloading all files successfully, write out the completed
-  // replacement superblock.
-  Status Finish();
-
- private:
-  FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock);
-  FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
-  FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
-
-  // Extract the embedded Status message from the given ErrorStatusPB.
-  // The given ErrorStatusPB must extend TabletCopyErrorPB.
-  static Status ExtractRemoteError(const rpc::ErrorStatusPB& remote_error);
-
-  static Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller);
-
-  // Update the bootstrap StatusListener with a message.
-  // The string "TabletCopy: " will be prepended to each message.
-  void UpdateStatusMessage(const std::string& message);
-
-  // End the tablet copy session.
-  Status EndRemoteSession();
-
-  // Download all WAL files sequentially.
-  Status DownloadWALs();
-
-  // Download a single WAL file.
-  // Assumes the WAL directories have already been created.
-  // WAL file is opened with options so that it will fsync() on close.
-  Status DownloadWAL(uint64_t wal_segment_seqno);
-
-  // Write out the Consensus Metadata file based on the ConsensusStatePB
-  // downloaded as part of initiating the tablet copy session.
-  Status WriteConsensusMetadata();
-
-  // Download all blocks belonging to a tablet sequentially.
-  //
-  // Blocks are given new IDs upon creation. On success, 'new_superblock_'
-  // is populated to reflect the new block IDs and should be used in lieu
-  // of 'superblock_' henceforth.
-  Status DownloadBlocks();
-
-  // Download the block specified by 'block_id'.
-  //
-  // On success:
-  // - 'block_id' is set to the new ID of the downloaded block.
-  // - 'block_count' is incremented.
-  Status DownloadAndRewriteBlock(BlockIdPB* block_id, int* block_count, int num_blocks);
-
-  // Download a single block.
-  // Data block is opened with options so that it will fsync() on close.
-  //
-  // On success, 'new_block_id' is set to the new ID of the downloaded block.
-  Status DownloadBlock(const BlockId& old_block_id, BlockId* new_block_id);
-
-  // Download a single remote file. The block and WAL implementations delegate
-  // to this method when downloading files.
-  //
-  // An Appendable is typically a WritableBlock (block) or WritableFile (WAL).
-  //
-  // Only used in one compilation unit, otherwise the implementation would
-  // need to be in the header.
-  template<class Appendable>
-  Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
-
-  Status VerifyData(uint64_t offset, const DataChunkPB& resp);
-
-  // Return standard log prefix.
-  std::string LogPrefix();
-
-  // Set-once members.
-  const std::string tablet_id_;
-  FsManager* const fs_manager_;
-  const std::shared_ptr<rpc::Messenger> messenger_;
-
-  // State flags that enforce the progress of tablet copy.
-  bool started_;            // Session started.
-  bool downloaded_wal_;     // WAL segments downloaded.
-  bool downloaded_blocks_;  // Data blocks downloaded.
-
-  // Session-specific data items.
-  bool replace_tombstoned_tablet_;
-
-  // Local tablet metadata file.
-  scoped_refptr<tablet::TabletMetadata> meta_;
-
-  // Local Consensus metadata file. This may initially be NULL if this is
-  // bootstrapping a new replica (rather than replacing an old one).
-  gscoped_ptr<consensus::ConsensusMetadata> cmeta_;
-
-  tablet::TabletStatusListener* status_listener_;
-  std::shared_ptr<TabletCopyServiceProxy> proxy_;
-  std::string session_id_;
-  uint64_t session_idle_timeout_millis_;
-  gscoped_ptr<tablet::TabletSuperBlockPB> superblock_;
-  gscoped_ptr<tablet::TabletSuperBlockPB> new_superblock_;
-  gscoped_ptr<consensus::ConsensusStatePB> remote_committed_cstate_;
-  std::vector<uint64_t> wal_seqnos_;
-  int64_t start_time_micros_;
-
-  DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
-};
-
-} // namespace tserver
-} // namespace kudu
-#endif /* KUDU_TSERVER_TABLET_COPY_CLIENT_H */

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service-test.cc b/src/kudu/tserver/remote_bootstrap_service-test.cc
deleted file mode 100644
index 1b5baf2..0000000
--- a/src/kudu/tserver/remote_bootstrap_service-test.cc
+++ /dev/null
@@ -1,491 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap-test-base.h"
-
-#include <gflags/gflags.h>
-#include <limits>
-#include <thread>
-#include <vector>
-
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/transfer.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/tserver/tserver_service.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util.h"
-
-#define ASSERT_REMOTE_ERROR(status, err, code, str) \
-    ASSERT_NO_FATAL_FAILURE(AssertRemoteError(status, err, code, str))
-
-DECLARE_uint64(tablet_copy_idle_timeout_ms);
-DECLARE_uint64(tablet_copy_timeout_poll_period_ms);
-
-namespace kudu {
-namespace tserver {
-
-using consensus::MaximumOpId;
-using consensus::MinimumOpId;
-using consensus::OpIdEquals;
-using env_util::ReadFully;
-using log::ReadableLogSegment;
-using rpc::ErrorStatusPB;
-using rpc::RpcController;
-using std::thread;
-using std::vector;
-
-class TabletCopyServiceTest : public TabletCopyTest {
- public:
-  TabletCopyServiceTest() {
-    // Poll for session expiration every 10 ms for the session timeout test.
-    FLAGS_tablet_copy_timeout_poll_period_ms = 10;
-  }
-
- protected:
-  void SetUp() OVERRIDE {
-    TabletCopyTest::SetUp();
-    tablet_copy_proxy_.reset(
-        new TabletCopyServiceProxy(client_messenger_, mini_server_->bound_rpc_addr()));
-  }
-
-  Status DoBeginTabletCopySession(const string& tablet_id,
-                                       const string& requestor_uuid,
-                                       BeginTabletCopySessionResponsePB* resp,
-                                       RpcController* controller) {
-    controller->set_timeout(MonoDelta::FromSeconds(1.0));
-    BeginTabletCopySessionRequestPB req;
-    req.set_tablet_id(tablet_id);
-    req.set_requestor_uuid(requestor_uuid);
-    return UnwindRemoteError(
-        tablet_copy_proxy_->BeginTabletCopySession(req, resp, controller), controller);
-  }
-
-  Status DoBeginValidTabletCopySession(string* session_id,
-                                            tablet::TabletSuperBlockPB* superblock = nullptr,
-                                            uint64_t* idle_timeout_millis = nullptr,
-                                            vector<uint64_t>* sequence_numbers = nullptr) {
-    BeginTabletCopySessionResponsePB resp;
-    RpcController controller;
-    RETURN_NOT_OK(DoBeginTabletCopySession(GetTabletId(), GetLocalUUID(), &resp, &controller));
-    *session_id = resp.session_id();
-    if (superblock) {
-      *superblock = resp.superblock();
-    }
-    if (idle_timeout_millis) {
-      *idle_timeout_millis = resp.session_idle_timeout_millis();
-    }
-    if (sequence_numbers) {
-      sequence_numbers->assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
-    }
-    return Status::OK();
-  }
-
-  Status DoCheckSessionActive(const string& session_id,
-                              CheckTabletCopySessionActiveResponsePB* resp,
-                              RpcController* controller) {
-    controller->set_timeout(MonoDelta::FromSeconds(1.0));
-    CheckTabletCopySessionActiveRequestPB req;
-    req.set_session_id(session_id);
-    return UnwindRemoteError(
-        tablet_copy_proxy_->CheckSessionActive(req, resp, controller), controller);
-  }
-
-  Status DoFetchData(const string& session_id, const DataIdPB& data_id,
-                     uint64_t* offset, int64_t* max_length,
-                     FetchDataResponsePB* resp,
-                     RpcController* controller) {
-    controller->set_timeout(MonoDelta::FromSeconds(1.0));
-    FetchDataRequestPB req;
-    req.set_session_id(session_id);
-    req.mutable_data_id()->CopyFrom(data_id);
-    if (offset) {
-      req.set_offset(*offset);
-    }
-    if (max_length) {
-      req.set_max_length(*max_length);
-    }
-    return UnwindRemoteError(
-        tablet_copy_proxy_->FetchData(req, resp, controller), controller);
-  }
-
-  Status DoEndTabletCopySession(const string& session_id, bool is_success,
-                                     const Status* error_msg,
-                                     EndTabletCopySessionResponsePB* resp,
-                                     RpcController* controller) {
-    controller->set_timeout(MonoDelta::FromSeconds(1.0));
-    EndTabletCopySessionRequestPB req;
-    req.set_session_id(session_id);
-    req.set_is_success(is_success);
-    if (error_msg) {
-      StatusToPB(*error_msg, req.mutable_error());
-    }
-    return UnwindRemoteError(
-        tablet_copy_proxy_->EndTabletCopySession(req, resp, controller), controller);
-  }
-
-  // Decode the remote error into a Status object.
-  Status ExtractRemoteError(const ErrorStatusPB* remote_error) {
-    const TabletCopyErrorPB& error =
-        remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
-    return StatusFromPB(error.status());
-  }
-
-  // Enhance a RemoteError Status message with additional details from the remote.
-  Status UnwindRemoteError(Status status, const RpcController* controller) {
-    if (!status.IsRemoteError() ||
-        controller->error_response()->code() != ErrorStatusPB::ERROR_APPLICATION) {
-      return status;
-    }
-    Status remote_error = ExtractRemoteError(controller->error_response());
-    return status.CloneAndPrepend(remote_error.ToString());
-  }
-
-  void AssertRemoteError(Status status, const ErrorStatusPB* remote_error,
-                         const TabletCopyErrorPB::Code app_code,
-                         const string& status_code_string) {
-    ASSERT_TRUE(status.IsRemoteError()) << "Unexpected status code: " << status.ToString()
-                                        << ", app code: "
-                                        << TabletCopyErrorPB::Code_Name(app_code)
-                                        << ", status code string: " << status_code_string;
-    const Status app_status = ExtractRemoteError(remote_error);
-    const TabletCopyErrorPB& error =
-        remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
-    ASSERT_EQ(app_code, error.code()) << error.ShortDebugString();
-    ASSERT_EQ(status_code_string, app_status.CodeAsString()) << app_status.ToString();
-    LOG(INFO) << app_status.ToString();
-  }
-
-  // Return BlockId in format suitable for a FetchData() call.
-  static DataIdPB AsDataTypeId(const BlockId& block_id) {
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::BLOCK);
-    block_id.CopyToPB(data_id.mutable_block_id());
-    return data_id;
-  }
-
-  gscoped_ptr<TabletCopyServiceProxy> tablet_copy_proxy_;
-};
-
-// Test beginning and ending a tablet copy session.
-TEST_F(TabletCopyServiceTest, TestSimpleBeginEndSession) {
-  string session_id;
-  tablet::TabletSuperBlockPB superblock;
-  uint64_t idle_timeout_millis;
-  vector<uint64_t> segment_seqnos;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
-                                               &superblock,
-                                               &idle_timeout_millis,
-                                               &segment_seqnos));
-  // Basic validation of returned params.
-  ASSERT_FALSE(session_id.empty());
-  ASSERT_EQ(FLAGS_tablet_copy_idle_timeout_ms, idle_timeout_millis);
-  ASSERT_TRUE(superblock.IsInitialized());
-  // We should have number of segments = number of rolls + 1 (due to the active segment).
-  ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
-
-  EndTabletCopySessionResponsePB resp;
-  RpcController controller;
-  ASSERT_OK(DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller));
-}
-
-// Test starting two sessions. The current implementation will silently only create one.
-TEST_F(TabletCopyServiceTest, TestBeginTwice) {
-  // Second time through should silently succeed.
-  for (int i = 0; i < 2; i++) {
-    string session_id;
-    ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
-    ASSERT_FALSE(session_id.empty());
-  }
-}
-
-// Regression test for KUDU-1436: race conditions if multiple requests
-// to begin the same tablet copy session arrive at more or less the
-// same time.
-TEST_F(TabletCopyServiceTest, TestBeginConcurrently) {
-  const int kNumThreads = 5;
-  vector<thread> threads;
-  vector<tablet::TabletSuperBlockPB> sblocks(kNumThreads);
-  for (int i = 0 ; i < kNumThreads; i++) {
-    threads.emplace_back([this, &sblocks, i]{
-        string session_id;
-        CHECK_OK(DoBeginValidTabletCopySession(&session_id, &sblocks[i]));
-        CHECK(!session_id.empty());
-      });
-  }
-  for (auto& t : threads) {
-    t.join();
-  }
-  // Verify that all threads got the same result.
-  for (int i = 1; i < threads.size(); i++) {
-    ASSERT_EQ(sblocks[i].DebugString(), sblocks[0].DebugString());
-  }
-}
-
-// Test bad session id error condition.
-TEST_F(TabletCopyServiceTest, TestInvalidSessionId) {
-  vector<string> bad_session_ids;
-  bad_session_ids.push_back("hodor");
-  bad_session_ids.push_back(GetLocalUUID());
-
-  // Fetch a block for a non-existent session.
-  for (const string& session_id : bad_session_ids) {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::BLOCK);
-    data_id.mutable_block_id()->set_id(1);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
-                        Status::NotFound("").CodeAsString());
-  }
-
-  // End a non-existent session.
-  for (const string& session_id : bad_session_ids) {
-    EndTabletCopySessionResponsePB resp;
-    RpcController controller;
-    Status status = DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
-                        Status::NotFound("").CodeAsString());
-  }
-}
-
-// Test bad tablet id error condition.
-TEST_F(TabletCopyServiceTest, TestInvalidTabletId) {
-  BeginTabletCopySessionResponsePB resp;
-  RpcController controller;
-  Status status =
-      DoBeginTabletCopySession("some-unknown-tablet", GetLocalUUID(), &resp, &controller);
-  ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::TABLET_NOT_FOUND,
-                      Status::NotFound("").CodeAsString());
-}
-
-// Test DataIdPB validation.
-TEST_F(TabletCopyServiceTest, TestInvalidBlockOrOpId) {
-  string session_id;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
-
-  // Invalid BlockId.
-  {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::BLOCK);
-    data_id.mutable_block_id()->set_id(1);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(),
-                        TabletCopyErrorPB::BLOCK_NOT_FOUND,
-                        Status::NotFound("").CodeAsString());
-  }
-
-  // Invalid Segment Sequence Number for log fetch.
-  {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::LOG_SEGMENT);
-    data_id.set_wal_segment_seqno(31337);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(),
-                        TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND,
-                        Status::NotFound("").CodeAsString());
-  }
-
-  // Empty data type with BlockId.
-  // The server will reject the request since we are missing the required 'type' field.
-  {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.mutable_block_id()->set_id(1);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_TRUE(status.IsRemoteError()) << status.ToString();
-    ASSERT_STR_CONTAINS(status.ToString(),
-                        "Invalid argument: invalid parameter for call "
-                        "kudu.tserver.TabletCopyService.FetchData: "
-                        "missing fields: data_id.type");
-  }
-
-  // Empty data type id (no BlockId, no Segment Sequence Number);
-  {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::LOG_SEGMENT);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(),
-                        TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
-                        Status::InvalidArgument("").CodeAsString());
-  }
-
-  // Both BlockId and Segment Sequence Number in the same "union" PB (illegal).
-  {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    DataIdPB data_id;
-    data_id.set_type(DataIdPB::BLOCK);
-    data_id.mutable_block_id()->set_id(1);
-    data_id.set_wal_segment_seqno(0);
-    Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
-    ASSERT_REMOTE_ERROR(status, controller.error_response(),
-                        TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
-                        Status::InvalidArgument("").CodeAsString());
-  }
-}
-
-// Test invalid file offset error condition.
-TEST_F(TabletCopyServiceTest, TestFetchInvalidBlockOffset) {
-  string session_id;
-  tablet::TabletSuperBlockPB superblock;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
-  FetchDataResponsePB resp;
-  RpcController controller;
-  // Impossible offset.
-  uint64_t offset = std::numeric_limits<uint64_t>::max();
-  Status status = DoFetchData(session_id, AsDataTypeId(FirstColumnBlockId(superblock)),
-                              &offset, nullptr, &resp, &controller);
-  ASSERT_REMOTE_ERROR(status, controller.error_response(),
-                      TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
-                      Status::InvalidArgument("").CodeAsString());
-}
-
-// Test that we are able to fetch an entire block.
-TEST_F(TabletCopyServiceTest, TestFetchBlockAtOnce) {
-  string session_id;
-  tablet::TabletSuperBlockPB superblock;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
-  // Local.
-  BlockId block_id = FirstColumnBlockId(superblock);
-  Slice local_data;
-  faststring scratch;
-  ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
-                               &scratch, &local_data));
-
-  // Remote.
-  FetchDataResponsePB resp;
-  RpcController controller;
-  ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id), nullptr, nullptr, &resp, &controller));
-
-  AssertDataEqual(local_data.data(), local_data.size(), resp.chunk());
-}
-
-// Test that we are able to incrementally fetch blocks.
-TEST_F(TabletCopyServiceTest, TestFetchBlockIncrementally) {
-  string session_id;
-  tablet::TabletSuperBlockPB superblock;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
-  BlockId block_id = FirstColumnBlockId(superblock);
-  Slice local_data;
-  faststring scratch;
-  ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
-                               &scratch, &local_data));
-
-  // Grab the remote data in several chunks.
-  int64_t block_size = local_data.size();
-  int64_t max_chunk_size = block_size / 5;
-  uint64_t offset = 0;
-  while (offset < block_size) {
-    FetchDataResponsePB resp;
-    RpcController controller;
-    ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id),
-                                 &offset, &max_chunk_size, &resp, &controller));
-    int64_t returned_bytes = resp.chunk().data().size();
-    ASSERT_LE(returned_bytes, max_chunk_size);
-    AssertDataEqual(local_data.data() + offset, returned_bytes, resp.chunk());
-    offset += returned_bytes;
-  }
-}
-
-// Test that we are able to fetch log segments.
-TEST_F(TabletCopyServiceTest, TestFetchLog) {
-  string session_id;
-  tablet::TabletSuperBlockPB superblock;
-  uint64_t idle_timeout_millis;
-  vector<uint64_t> segment_seqnos;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
-                                               &superblock,
-                                               &idle_timeout_millis,
-                                               &segment_seqnos));
-
-  ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
-  uint64_t seg_seqno = *segment_seqnos.begin();
-
-  // Fetch the remote data.
-  FetchDataResponsePB resp;
-  RpcController controller;
-  DataIdPB data_id;
-  data_id.set_type(DataIdPB::LOG_SEGMENT);
-  data_id.set_wal_segment_seqno(seg_seqno);
-  ASSERT_OK(DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller));
-
-  // Fetch the local data.
-  log::SegmentSequence local_segments;
-  ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
-
-  uint64_t first_seg_seqno = (*local_segments.begin())->header().sequence_number();
-
-
-  ASSERT_EQ(seg_seqno, first_seg_seqno)
-      << "Expected equal sequence numbers: " << seg_seqno
-      << " and " << first_seg_seqno;
-  const scoped_refptr<ReadableLogSegment>& segment = local_segments[0];
-  faststring scratch;
-  int64_t size = segment->file_size();
-  scratch.resize(size);
-  Slice slice;
-  ASSERT_OK(ReadFully(segment->readable_file().get(), 0, size, &slice, scratch.data()));
-
-  AssertDataEqual(slice.data(), slice.size(), resp.chunk());
-}
-
-// Test that the tablet copy session timeout works properly.
-TEST_F(TabletCopyServiceTest, TestSessionTimeout) {
-  // This flag should be seen by the service due to TSO.
-  // We have also reduced the timeout polling frequency in SetUp().
-  FLAGS_tablet_copy_idle_timeout_ms = 1; // Expire the session almost immediately.
-
-  // Start session.
-  string session_id;
-  ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
-
-  MonoTime start_time = MonoTime::Now(MonoTime::FINE);
-  CheckTabletCopySessionActiveResponsePB resp;
-
-  do {
-    RpcController controller;
-    ASSERT_OK(DoCheckSessionActive(session_id, &resp, &controller));
-    if (!resp.session_is_active()) {
-      break;
-    }
-    SleepFor(MonoDelta::FromMilliseconds(1)); // 1 ms
-  } while (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start_time).ToSeconds() < 10);
-
-  ASSERT_FALSE(resp.session_is_active()) << "Tablet Copy session did not time out!";
-}
-
-} // namespace tserver
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service.cc b/src/kudu/tserver/remote_bootstrap_service.cc
deleted file mode 100644
index 1d7104c..0000000
--- a/src/kudu/tserver/remote_bootstrap_service.cc
+++ /dev/null
@@ -1,357 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap_service.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-#include <string>
-#include <vector>
-
-#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/log.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/tserver/remote_bootstrap_session.h"
-#include "kudu/tserver/tablet_peer_lookup.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/flag_tags.h"
-
-// Note, this macro assumes the existence of a local var named 'context'.
-#define RPC_RETURN_APP_ERROR(app_err, message, s) \
-  do { \
-    SetupErrorAndRespond(context, app_err, message, s); \
-    return; \
-  } while (false)
-
-#define RPC_RETURN_NOT_OK(expr, app_err, message) \
-  do { \
-    Status s = (expr); \
-    if (!s.ok()) { \
-      RPC_RETURN_APP_ERROR(app_err, message, s); \
-    } \
-  } while (false)
-
-DEFINE_uint64(tablet_copy_idle_timeout_ms, 180000,
-              "Amount of time without activity before a tablet copy "
-              "session will expire, in millis");
-TAG_FLAG(tablet_copy_idle_timeout_ms, hidden);
-
-DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
-              "How often the tablet_copy service polls for expired "
-              "tablet copy sessions, in millis");
-TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);
-
-DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
-              "Fraction of the time when the tablet will crash while "
-              "servicing a TabletCopyService FetchData() RPC call. "
-              "(For testing only!)");
-TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
-
-namespace kudu {
-namespace tserver {
-
-using crc::Crc32c;
-using strings::Substitute;
-using tablet::TabletPeer;
-
-static void SetupErrorAndRespond(rpc::RpcContext* context,
-                                 TabletCopyErrorPB::Code code,
-                                 const string& message,
-                                 const Status& s) {
-  LOG(WARNING) << "Error handling TabletCopyService RPC request from "
-               << context->requestor_string() << ": "
-               << s.ToString();
-  TabletCopyErrorPB error;
-  StatusToPB(s, error.mutable_status());
-  error.set_code(code);
-  context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
-                                   message, error);
-}
-
-TabletCopyServiceImpl::TabletCopyServiceImpl(
-    FsManager* fs_manager,
-    TabletPeerLookupIf* tablet_peer_lookup,
-    const scoped_refptr<MetricEntity>& metric_entity,
-    const scoped_refptr<rpc::ResultTracker>& result_tracker)
-    : TabletCopyServiceIf(metric_entity, result_tracker),
-      fs_manager_(CHECK_NOTNULL(fs_manager)),
-      tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
-      shutdown_latch_(1) {
-  CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
-                          &TabletCopyServiceImpl::EndExpiredSessions, this,
-                          &session_expiration_thread_));
-}
-
-void TabletCopyServiceImpl::BeginTabletCopySession(
-        const BeginTabletCopySessionRequestPB* req,
-        BeginTabletCopySessionResponsePB* resp,
-        rpc::RpcContext* context) {
-  const string& requestor_uuid = req->requestor_uuid();
-  const string& tablet_id = req->tablet_id();
-
-  // For now, we use the requestor_uuid with the tablet id as the session id,
-  // but there is no guarantee this will not change in the future.
-  const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);
-
-  scoped_refptr<TabletPeer> tablet_peer;
-  RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
-                    TabletCopyErrorPB::TABLET_NOT_FOUND,
-                    Substitute("Unable to find specified tablet: $0", tablet_id));
-
-  scoped_refptr<TabletCopySession> session;
-  {
-    MutexLock l(sessions_lock_);
-    if (!FindCopy(sessions_, session_id, &session)) {
-      LOG(INFO) << Substitute(
-          "Beginning new tablet copy session on tablet $0 from peer $1"
-          " at $2: session id = $3",
-          tablet_id, requestor_uuid, context->requestor_string(), session_id);
-      session.reset(new TabletCopySession(tablet_peer, session_id,
-                                               requestor_uuid, fs_manager_));
-      RPC_RETURN_NOT_OK(session->Init(),
-                        TabletCopyErrorPB::UNKNOWN_ERROR,
-                        Substitute("Error initializing tablet copy session for tablet $0",
-                                   tablet_id));
-      InsertOrDie(&sessions_, session_id, session);
-    } else {
-      LOG(INFO) << Substitute(
-          "Re-sending initialization info for existing tablet copy session on tablet $0"
-          " from peer $1 at $2: session_id = $3",
-          tablet_id, requestor_uuid, context->requestor_string(), session_id);
-    }
-    ResetSessionExpirationUnlocked(session_id);
-  }
-
-  resp->set_responder_uuid(fs_manager_->uuid());
-  resp->set_session_id(session_id);
-  resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_ms);
-  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
-  resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
-
-  for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
-    resp->add_wal_segment_seqnos(segment->header().sequence_number());
-  }
-
-  context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::CheckSessionActive(
-        const CheckTabletCopySessionActiveRequestPB* req,
-        CheckTabletCopySessionActiveResponsePB* resp,
-        rpc::RpcContext* context) {
-  const string& session_id = req->session_id();
-
-  // Look up and validate tablet copy session.
-  scoped_refptr<TabletCopySession> session;
-  MutexLock l(sessions_lock_);
-  TabletCopyErrorPB::Code app_error;
-  Status status = FindSessionUnlocked(session_id, &app_error, &session);
-  if (status.ok()) {
-    if (req->keepalive()) {
-      ResetSessionExpirationUnlocked(session_id);
-    }
-    resp->set_session_is_active(true);
-    context->RespondSuccess();
-    return;
-  } else if (app_error == TabletCopyErrorPB::NO_SESSION) {
-    resp->set_session_is_active(false);
-    context->RespondSuccess();
-    return;
-  } else {
-    RPC_RETURN_NOT_OK(status, app_error,
-                      Substitute("Error trying to check whether session $0 is active", session_id));
-  }
-}
-
-void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
-                                           FetchDataResponsePB* resp,
-                                           rpc::RpcContext* context) {
-  const string& session_id = req->session_id();
-
-  // Look up and validate tablet copy session.
-  scoped_refptr<TabletCopySession> session;
-  {
-    MutexLock l(sessions_lock_);
-    TabletCopyErrorPB::Code app_error;
-    RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
-                      app_error, "No such session");
-    ResetSessionExpirationUnlocked(session_id);
-  }
-
-  MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
-
-  uint64_t offset = req->offset();
-  int64_t client_maxlen = req->max_length();
-
-  const DataIdPB& data_id = req->data_id();
-  TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
-  RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
-                    error_code, "Invalid DataId");
-
-  DataChunkPB* data_chunk = resp->mutable_chunk();
-  string* data = data_chunk->mutable_data();
-  int64_t total_data_length = 0;
-  if (data_id.type() == DataIdPB::BLOCK) {
-    // Fetching a data block chunk.
-    const BlockId& block_id = BlockId::FromPB(data_id.block_id());
-    RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
-                                             data, &total_data_length, &error_code),
-                      error_code, "Unable to get piece of data block");
-  } else {
-    // Fetching a log segment chunk.
-    uint64_t segment_seqno = data_id.wal_segment_seqno();
-    RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
-                                                  data, &total_data_length, &error_code),
-                      error_code, "Unable to get piece of log segment");
-  }
-
-  data_chunk->set_total_data_length(total_data_length);
-  data_chunk->set_offset(offset);
-
-  // Calculate checksum.
-  uint32_t crc32 = Crc32c(data->data(), data->length());
-  data_chunk->set_crc32(crc32);
-
-  context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::EndTabletCopySession(
-        const EndTabletCopySessionRequestPB* req,
-        EndTabletCopySessionResponsePB* resp,
-        rpc::RpcContext* context) {
-  {
-    MutexLock l(sessions_lock_);
-    TabletCopyErrorPB::Code app_error;
-    LOG(INFO) << "Request end of tablet copy session " << req->session_id()
-      << " received from " << context->requestor_string();
-    RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
-                      app_error, "No such session");
-  }
-  context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::Shutdown() {
-  shutdown_latch_.CountDown();
-  session_expiration_thread_->Join();
-
-  // Destroy all tablet copy sessions.
-  vector<string> session_ids;
-  for (const MonoTimeMap::value_type& entry : session_expirations_) {
-    session_ids.push_back(entry.first);
-  }
-  for (const string& session_id : session_ids) {
-    LOG(INFO) << "Destroying tablet copy session " << session_id << " due to service shutdown";
-    TabletCopyErrorPB::Code app_error;
-    CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
-  }
-}
-
-Status TabletCopyServiceImpl::FindSessionUnlocked(
-        const string& session_id,
-        TabletCopyErrorPB::Code* app_error,
-        scoped_refptr<TabletCopySession>* session) const {
-  if (!FindCopy(sessions_, session_id, session)) {
-    *app_error = TabletCopyErrorPB::NO_SESSION;
-    return Status::NotFound(
-        Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
-  }
-  return Status::OK();
-}
-
-Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
-        const DataIdPB& data_id,
-        TabletCopyErrorPB::Code* app_error,
-        const scoped_refptr<TabletCopySession>& session) const {
-  if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
-    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
-    return Status::InvalidArgument(
-        Substitute("Only one of BlockId or segment sequence number are required, "
-            "but both were specified. DataTypeID: $0", data_id.ShortDebugString()));
-  } else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
-    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
-    return Status::InvalidArgument(
-        Substitute("Only one of BlockId or segment sequence number are required, "
-            "but neither were specified. DataTypeID: $0", data_id.ShortDebugString()));
-  }
-
-  if (data_id.type() == DataIdPB::BLOCK) {
-    if (PREDICT_FALSE(!data_id.has_block_id())) {
-      return Status::InvalidArgument("block_id must be specified for type == BLOCK",
-                                     data_id.ShortDebugString());
-    }
-  } else {
-    if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
-      return Status::InvalidArgument(
-          "segment sequence number must be specified for type == LOG_SEGMENT",
-          data_id.ShortDebugString());
-    }
-  }
-
-  return Status::OK();
-}
-
-void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
-  MonoTime expiration(MonoTime::Now(MonoTime::FINE));
-  expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
-  InsertOrUpdate(&session_expirations_, session_id, expiration);
-}
-
-Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
-        const std::string& session_id,
-        TabletCopyErrorPB::Code* app_error) {
-  scoped_refptr<TabletCopySession> session;
-  RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
-  // Remove the session from the map.
-  // It will get destroyed once there are no outstanding refs.
-  LOG(INFO) << "Ending tablet copy session " << session_id << " on tablet "
-    << session->tablet_id() << " with peer " << session->requestor_uuid();
-  CHECK_EQ(1, sessions_.erase(session_id));
-  CHECK_EQ(1, session_expirations_.erase(session_id));
-
-  return Status::OK();
-}
-
-void TabletCopyServiceImpl::EndExpiredSessions() {
-  do {
-    MutexLock l(sessions_lock_);
-    MonoTime now = MonoTime::Now(MonoTime::FINE);
-
-    vector<string> expired_session_ids;
-    for (const MonoTimeMap::value_type& entry : session_expirations_) {
-      const string& session_id = entry.first;
-      const MonoTime& expiration = entry.second;
-      if (expiration.ComesBefore(now)) {
-        expired_session_ids.push_back(session_id);
-      }
-    }
-    for (const string& session_id : expired_session_ids) {
-      LOG(INFO) << "Tablet Copy session " << session_id
-                << " has expired. Terminating session.";
-      TabletCopyErrorPB::Code app_error;
-      CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
-    }
-  } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
-                                    FLAGS_tablet_copy_timeout_poll_period_ms)));
-}
-
-} // namespace tserver
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service.h b/src/kudu/tserver/remote_bootstrap_service.h
deleted file mode 100644
index a5eab8c..0000000
--- a/src/kudu/tserver/remote_bootstrap_service.h
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_SERVICE_H_
-#define KUDU_TSERVER_TABLET_COPY_SERVICE_H_
-
-#include <string>
-#include <unordered_map>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/tserver/remote_bootstrap.service.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-namespace kudu {
-class FsManager;
-
-namespace log {
-class ReadableLogSegment;
-} // namespace log
-
-namespace tserver {
-
-class TabletCopySession;
-class TabletPeerLookupIf;
-
-class TabletCopyServiceImpl : public TabletCopyServiceIf {
- public:
-  TabletCopyServiceImpl(FsManager* fs_manager,
-                             TabletPeerLookupIf* tablet_peer_lookup,
-                             const scoped_refptr<MetricEntity>& metric_entity,
-                             const scoped_refptr<rpc::ResultTracker>& result_tracker);
-
-  virtual void BeginTabletCopySession(const BeginTabletCopySessionRequestPB* req,
-                                           BeginTabletCopySessionResponsePB* resp,
-                                           rpc::RpcContext* context) OVERRIDE;
-
-  virtual void CheckSessionActive(const CheckTabletCopySessionActiveRequestPB* req,
-                                  CheckTabletCopySessionActiveResponsePB* resp,
-                                  rpc::RpcContext* context) OVERRIDE;
-
-  virtual void FetchData(const FetchDataRequestPB* req,
-                         FetchDataResponsePB* resp,
-                         rpc::RpcContext* context) OVERRIDE;
-
-  virtual void EndTabletCopySession(const EndTabletCopySessionRequestPB* req,
-                                         EndTabletCopySessionResponsePB* resp,
-                                         rpc::RpcContext* context) OVERRIDE;
-
-  virtual void Shutdown() OVERRIDE;
-
- private:
-  typedef std::unordered_map<std::string, scoped_refptr<TabletCopySession> > SessionMap;
-  typedef std::unordered_map<std::string, MonoTime> MonoTimeMap;
-
-  // Look up session in session map.
-  Status FindSessionUnlocked(const std::string& session_id,
-                             TabletCopyErrorPB::Code* app_error,
-                             scoped_refptr<TabletCopySession>* session) const;
-
-  // Validate the data identifier in a FetchData request.
-  Status ValidateFetchRequestDataId(const DataIdPB& data_id,
-                                    TabletCopyErrorPB::Code* app_error,
-                                    const scoped_refptr<TabletCopySession>& session) const;
-
-  // Take note of session activity; Re-update the session timeout deadline.
-  void ResetSessionExpirationUnlocked(const std::string& session_id);
-
-  // Destroy the specified tablet copy session.
-  Status DoEndTabletCopySessionUnlocked(const std::string& session_id,
-                                             TabletCopyErrorPB::Code* app_error);
-
-  // The timeout thread periodically checks whether sessions are expired and
-  // removes them from the map.
-  void EndExpiredSessions();
-
-  FsManager* fs_manager_;
-  TabletPeerLookupIf* tablet_peer_lookup_;
-
-  // Protects sessions_ and session_expirations_ maps.
-  mutable Mutex sessions_lock_;
-  SessionMap sessions_;
-  MonoTimeMap session_expirations_;
-
-  // Session expiration thread.
-  // TODO: this is a hack, replace with some kind of timer impl. See KUDU-286.
-  CountDownLatch shutdown_latch_;
-  scoped_refptr<Thread> session_expiration_thread_;
-};
-
-} // namespace tserver
-} // namespace kudu
-
-#endif // KUDU_TSERVER_TABLET_COPY_SERVICE_H_