You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/07 03:56:52 UTC
[4/5] kudu git commit: Rename remote bootstrap files to 'tablet copy'
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap.proto b/src/kudu/tserver/remote_bootstrap.proto
deleted file mode 100644
index 1e89919..0000000
--- a/src/kudu/tserver/remote_bootstrap.proto
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package kudu.tserver;
-
-option java_package = "org.apache.kudu.tserver";
-
-import "kudu/common/wire_protocol.proto";
-import "kudu/consensus/metadata.proto";
-import "kudu/fs/fs.proto";
-import "kudu/rpc/rpc_header.proto";
-import "kudu/tablet/metadata.proto";
-
-// RaftConfig tablet copy RPC calls.
-service TabletCopyService {
- // Establish a tablet copy session.
- rpc BeginTabletCopySession(BeginTabletCopySessionRequestPB)
- returns (BeginTabletCopySessionResponsePB);
-
- // Check whether the specified session is active.
- rpc CheckSessionActive(CheckTabletCopySessionActiveRequestPB)
- returns (CheckTabletCopySessionActiveResponsePB);
-
- // Fetch data (blocks, logs) from the server.
- rpc FetchData(FetchDataRequestPB)
- returns (FetchDataResponsePB);
-
- // End a tablet copy session, allow server to release resources.
- rpc EndTabletCopySession(EndTabletCopySessionRequestPB)
- returns (EndTabletCopySessionResponsePB);
-}
-
-// Tablet Copy-specific errors use this protobuf.
-message TabletCopyErrorPB {
- extend kudu.rpc.ErrorStatusPB {
- optional TabletCopyErrorPB tablet_copy_error_ext = 102;
- }
-
- enum Code {
- // An error which has no more specific error code.
- // The code and message in 'status' may reveal more details.
- //
- // RPCs should avoid returning this, since callers will not be
- // able to easily parse the error.
- UNKNOWN_ERROR = 1;
-
- // The specified tablet copy session either never existed or has expired.
- NO_SESSION = 2;
-
- // Unknown tablet.
- TABLET_NOT_FOUND = 3;
-
- // Unknown data block.
- BLOCK_NOT_FOUND = 4;
-
- // Unknown WAL segment.
- WAL_SEGMENT_NOT_FOUND = 5;
-
- // Invalid request. Possibly missing parameters.
- INVALID_TABLET_COPY_REQUEST = 6;
-
- // Error reading or transferring data.
- IO_ERROR = 7;
- }
-
- // The error code.
- required Code code = 1 [ default = UNKNOWN_ERROR ];
-
- // The Status object for the error. This will include a textual
- // message that may be more useful to present in log messages, etc,
- // though its error code is less specific.
- required AppStatusPB status = 2;
-}
-
-message BeginTabletCopySessionRequestPB {
- // permanent_uuid of the requesting peer.
- required bytes requestor_uuid = 1;
-
- // tablet_id of the tablet the requester desires to bootstrap from.
- required bytes tablet_id = 2;
-}
-
-message BeginTabletCopySessionResponsePB {
- // Opaque session id assigned by the server.
- // No guarantees are made as to the format of the session id.
- required bytes session_id = 1;
-
- // Maximum session idle timeout between requests.
- // Learners will have to start over again if they reach this timeout.
- // A value of 0 means there is no timeout.
- required uint64 session_idle_timeout_millis = 2;
-
- // Active superblock at the time of the request.
- required tablet.TabletSuperBlockPB superblock = 3;
-
- // Identifiers for the WAL segments available for download.
- // Each WAL segment is keyed by its sequence number.
- repeated uint64 wal_segment_seqnos = 4;
-
- // A snapshot of the committed Consensus state at the time that the
- // tablet copy session was started.
- required consensus.ConsensusStatePB initial_committed_cstate = 5;
-
- // permanent_uuid of the responding peer.
- optional bytes responder_uuid = 6;
-}
-
-message CheckTabletCopySessionActiveRequestPB {
- // Valid Session ID returned by a BeginTabletCopySession() RPC call.
- required bytes session_id = 1;
-
- // Set keepalive to true to reset the session timeout timer.
- optional bool keepalive = 2 [default = false];
-}
-
-message CheckTabletCopySessionActiveResponsePB {
- // Whether the given session id represents an active tablet copy session.
- required bool session_is_active = 1;
-}
-
-// A "union" type that allows the same RPC call to fetch different types of
-// data (data blocks or log files).
-message DataIdPB {
- enum IdType {
- UNKNOWN = 0;
- BLOCK = 1;
- LOG_SEGMENT = 2;
- }
-
- // Indicator whether it's a block or log segment id.
- required IdType type = 1;
-
- // Exactly one of these must be set.
- optional BlockIdPB block_id = 2; // To fetch a block.
- optional uint64 wal_segment_seqno = 3; // To fetch a log segment.
-}
-
-message FetchDataRequestPB {
- // Valid Session ID returned by a BeginTabletCopySession() RPC call.
- required bytes session_id = 1;
-
- // The server will use this ID to determine the key and type of data
- // that was requested.
- required DataIdPB data_id = 2;
-
- // Offset into data to start reading from.
- // If not specified, the server will send the data from offset 0.
- optional uint64 offset = 3 [default = 0];
-
- // Maximum length of the chunk of data to return.
- // If max_length is not specified, or if the server's max is less than the
- // requested max, the server will use its own max.
- optional int64 max_length = 4 [default = 0];
-}
-
-// A chunk of data (a slice of a block, file, etc).
-message DataChunkPB {
- // Offset into the complete data block or file that 'data' starts at.
- required uint64 offset = 1;
-
- // Actual bytes of data from the data block, starting at 'offset'.
- required bytes data = 2;
-
- // CRC32C of the bytes contained in 'data'.
- required fixed32 crc32 = 3;
-
- // Full length, in bytes, of the complete data block or file on the server.
- // The number of bytes returned in 'data' can certainly be less than this.
- required int64 total_data_length = 4;
-}
-
-message FetchDataResponsePB {
- // The server will automatically release the resources (i.e. close file, free
- // read buffers) for a given data resource after the last byte is read.
- // So, per-resource, chunks are optimized to be fetched in-order.
- required DataChunkPB chunk = 1;
-}
-
-message EndTabletCopySessionRequestPB {
- required bytes session_id = 1;
-
- // Set to true if bootstrap is successful.
- required bool is_success = 2;
-
- // Client-provided error message. The server will log this error so that an
- // admin can identify when bad things are happening with tablet copy.
- optional AppStatusPB error = 3;
-}
-
-message EndTabletCopySessionResponsePB {
-}
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client-test.cc b/src/kudu/tserver/remote_bootstrap_client-test.cc
deleted file mode 100644
index d1bfc16..0000000
--- a/src/kudu/tserver/remote_bootstrap_client-test.cc
+++ /dev/null
@@ -1,241 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap-test-base.h"
-
-#include "kudu/consensus/quorum_util.h"
-#include "kudu/gutil/strings/fastmem.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tserver/remote_bootstrap_client.h"
-#include "kudu/util/env_util.h"
-
-using std::shared_ptr;
-
-namespace kudu {
-namespace tserver {
-
-using consensus::GetRaftConfigLeader;
-using consensus::RaftPeerPB;
-using tablet::TabletMetadata;
-using tablet::TabletStatusListener;
-
-class TabletCopyClientTest : public TabletCopyTest {
- public:
- virtual void SetUp() OVERRIDE {
- NO_FATALS(TabletCopyTest::SetUp());
-
- fs_manager_.reset(new FsManager(Env::Default(), GetTestPath("client_tablet")));
- ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
- ASSERT_OK(fs_manager_->Open());
-
- tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
- rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
- client_.reset(new TabletCopyClient(GetTabletId(),
- fs_manager_.get(),
- messenger_));
- ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus()
- ->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_));
-
- HostPort host_port;
- ASSERT_OK(HostPortFromPB(leader_.last_known_addr(), &host_port));
- ASSERT_OK(client_->Start(host_port, &meta_));
- }
-
- protected:
- Status CompareFileContents(const string& path1, const string& path2);
-
- gscoped_ptr<FsManager> fs_manager_;
- shared_ptr<rpc::Messenger> messenger_;
- gscoped_ptr<TabletCopyClient> client_;
- scoped_refptr<TabletMetadata> meta_;
- RaftPeerPB leader_;
-};
-
-Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) {
- shared_ptr<RandomAccessFile> file1, file2;
- RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path1, &file1));
- RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path2, &file2));
-
- uint64_t size1, size2;
- RETURN_NOT_OK(file1->Size(&size1));
- RETURN_NOT_OK(file2->Size(&size2));
- if (size1 != size2) {
- return Status::Corruption("Sizes of files don't match",
- strings::Substitute("$0 vs $1 bytes", size1, size2));
- }
-
- Slice slice1, slice2;
- faststring scratch1, scratch2;
- scratch1.resize(size1);
- scratch2.resize(size2);
- RETURN_NOT_OK(env_util::ReadFully(file1.get(), 0, size1, &slice1, scratch1.data()));
- RETURN_NOT_OK(env_util::ReadFully(file2.get(), 0, size2, &slice2, scratch2.data()));
- int result = strings::fastmemcmp_inlined(slice1.data(), slice2.data(), size1);
- if (result != 0) {
- return Status::Corruption("Files do not match");
- }
- return Status::OK();
-}
-
-// Basic begin / end tablet copy session.
-TEST_F(TabletCopyClientTest, TestBeginEndSession) {
- TabletStatusListener listener(meta_);
- ASSERT_OK(client_->FetchAll(&listener));
- ASSERT_OK(client_->Finish());
-}
-
-// Basic data block download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadBlock) {
- TabletStatusListener listener(meta_);
- BlockId block_id = FirstColumnBlockId(*client_->superblock_);
- Slice slice;
- faststring scratch;
-
- // Ensure the block wasn't there before (it shouldn't be, we use our own FsManager dir).
- Status s;
- s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
- ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
-
- // Check that the client downloaded the block and verification passed.
- BlockId new_block_id;
- ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id));
-
- // Ensure it placed the block where we expected it to.
- s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice);
- ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
- ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice));
-}
-
-// Basic WAL segment download unit test.
-TEST_F(TabletCopyClientTest, TestDownloadWalSegment) {
- ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId())));
-
- uint64_t seqno = client_->wal_seqnos_[0];
- string path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno);
-
- ASSERT_FALSE(fs_manager_->Exists(path));
- ASSERT_OK(client_->DownloadWAL(seqno));
- ASSERT_TRUE(fs_manager_->Exists(path));
-
- log::SegmentSequence local_segments;
- ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
- const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0];
- string server_path = segment->path();
-
- // Compare the downloaded file with the source file.
- ASSERT_OK(CompareFileContents(path, server_path));
-}
-
-// Ensure that we detect data corruption at the per-transfer level.
-TEST_F(TabletCopyClientTest, TestVerifyData) {
- string good = "This is a known good string";
- string bad = "This is a known bad! string";
- const int kGoodOffset = 0;
- const int kBadOffset = 1;
- const int64_t kDataTotalLen = std::numeric_limits<int64_t>::max(); // Ignored.
-
- // Create a known-good PB.
- DataChunkPB valid_chunk;
- valid_chunk.set_offset(0);
- valid_chunk.set_data(good);
- valid_chunk.set_crc32(crc::Crc32c(good.data(), good.length()));
- valid_chunk.set_total_data_length(kDataTotalLen);
-
- // Make sure we work on the happy case.
- ASSERT_OK(client_->VerifyData(kGoodOffset, valid_chunk));
-
- // Test unexpected offset.
- DataChunkPB bad_offset = valid_chunk;
- bad_offset.set_offset(kBadOffset);
- Status s;
- s = client_->VerifyData(kGoodOffset, bad_offset);
- ASSERT_TRUE(s.IsInvalidArgument()) << "Bad offset expected: " << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "Offset did not match");
- LOG(INFO) << "Expected error returned: " << s.ToString();
-
- // Test bad checksum.
- DataChunkPB bad_checksum = valid_chunk;
- bad_checksum.set_data(bad);
- s = client_->VerifyData(kGoodOffset, bad_checksum);
- ASSERT_TRUE(s.IsCorruption()) << "Invalid checksum expected: " << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "CRC32 does not match");
- LOG(INFO) << "Expected error returned: " << s.ToString();
-}
-
-namespace {
-
-vector<BlockId> GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) {
- vector<BlockId> data_blocks;
-
- for (const tablet::RowSetDataPB& rowset : sb.rowsets()) {
- for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) {
- data_blocks.push_back(BlockId::FromPB(redo.block()));
- }
- for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) {
- data_blocks.push_back(BlockId::FromPB(undo.block()));
- }
- for (const tablet::ColumnDataPB& column : rowset.columns()) {
- data_blocks.push_back(BlockId::FromPB(column.block()));
- }
- if (rowset.has_bloom_block()) {
- data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
- }
- if (rowset.has_adhoc_index_block()) {
- data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
- }
- }
-
- std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare());
- return data_blocks;
-}
-
-} // anonymous namespace
-
-TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
- // Download all the blocks.
- ASSERT_OK(client_->DownloadBlocks());
-
- // Verify that the new superblock reflects the changes in block IDs.
- //
- // As long as block IDs are generated with UUIDs or something equally
- // unique, there's no danger of a block in the new superblock somehow
- // being assigned the same ID as a block in the existing superblock.
- vector<BlockId> old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get());
- vector<BlockId> new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get());
- vector<BlockId> result;
- std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(),
- new_data_blocks.begin(), new_data_blocks.end(),
- std::back_inserter(result), BlockIdCompare());
- ASSERT_TRUE(result.empty());
- ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size());
-
- // Verify that the old blocks aren't found. We're using a different
- // FsManager than 'tablet_peer', so the only way an old block could end
- // up in ours is due to a tablet copy client bug.
- for (const BlockId& block_id : old_data_blocks) {
- gscoped_ptr<fs::ReadableBlock> block;
- Status s = fs_manager_->OpenBlock(block_id, &block);
- ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString();
- }
- // And the new blocks are all present.
- for (const BlockId& block_id : new_data_blocks) {
- gscoped_ptr<fs::ReadableBlock> block;
- ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
- }
-}
-
-} // namespace tserver
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client.cc b/src/kudu/tserver/remote_bootstrap_client.cc
deleted file mode 100644
index a7cdde1..0000000
--- a/src/kudu/tserver/remote_bootstrap_client.cc
+++ /dev/null
@@ -1,563 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tserver/remote_bootstrap_client.h"
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/fs/block_id.h"
-#include "kudu/fs/block_manager.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/gutil/walltime.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/transfer.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/tserver/remote_bootstrap.proxy.h"
-#include "kudu/tserver/tablet_server.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/logging.h"
-#include "kudu/util/net/net_util.h"
-
-DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
- "Tablet server RPC client timeout for BeginTabletCopySession calls. "
- "Also used for EndTabletCopySession calls.");
-TAG_FLAG(tablet_copy_begin_session_timeout_ms, hidden);
-
-DEFINE_bool(tablet_copy_save_downloaded_metadata, false,
- "Save copies of the downloaded tablet copy files for debugging purposes. "
- "Note: This is only intended for debugging and should not be normally used!");
-TAG_FLAG(tablet_copy_save_downloaded_metadata, advanced);
-TAG_FLAG(tablet_copy_save_downloaded_metadata, hidden);
-TAG_FLAG(tablet_copy_save_downloaded_metadata, runtime);
-
-DEFINE_int32(tablet_copy_dowload_file_inject_latency_ms, 0,
- "Injects latency into the loop that downloads files, causing tablet copy "
- "to take much longer. For use in tests only.");
-TAG_FLAG(tablet_copy_dowload_file_inject_latency_ms, hidden);
-
-DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
-
-// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
-#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
- RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
-
-namespace kudu {
-namespace tserver {
-
-using consensus::ConsensusMetadata;
-using consensus::ConsensusStatePB;
-using consensus::OpId;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using env_util::CopyFile;
-using fs::WritableBlock;
-using rpc::Messenger;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-using strings::Substitute;
-using tablet::ColumnDataPB;
-using tablet::DeltaDataPB;
-using tablet::RowSetDataPB;
-using tablet::TabletDataState;
-using tablet::TabletDataState_Name;
-using tablet::TabletMetadata;
-using tablet::TabletStatusListener;
-using tablet::TabletSuperBlockPB;
-
-TabletCopyClient::TabletCopyClient(std::string tablet_id,
- FsManager* fs_manager,
- shared_ptr<Messenger> messenger)
- : tablet_id_(std::move(tablet_id)),
- fs_manager_(fs_manager),
- messenger_(std::move(messenger)),
- started_(false),
- downloaded_wal_(false),
- downloaded_blocks_(false),
- replace_tombstoned_tablet_(false),
- status_listener_(nullptr),
- session_idle_timeout_millis_(0),
- start_time_micros_(0) {}
-
-TabletCopyClient::~TabletCopyClient() {
- // Note: Ending the tablet copy session releases anchors on the remote.
- WARN_NOT_OK(EndRemoteSession(), "Unable to close tablet copy session");
-}
-
-Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>& meta,
- int64_t caller_term) {
- CHECK_EQ(tablet_id_, meta->tablet_id());
- TabletDataState data_state = meta->tablet_data_state();
- if (data_state != tablet::TABLET_DATA_TOMBSTONED) {
- return Status::IllegalState(Substitute("Tablet $0 not in tombstoned state: $1 ($2)",
- tablet_id_,
- TabletDataState_Name(data_state),
- data_state));
- }
-
- replace_tombstoned_tablet_ = true;
- meta_ = meta;
-
- int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
- if (last_logged_term > caller_term) {
- return Status::InvalidArgument(
- Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
- "for tablet $1 has higher term $2. Refusing tablet copy from leader",
- caller_term, tablet_id_, last_logged_term));
- }
-
- // Load the old consensus metadata, if it exists.
- gscoped_ptr<ConsensusMetadata> cmeta;
- Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
- fs_manager_->uuid(), &cmeta);
- if (s.IsNotFound()) {
- // The consensus metadata was not written to disk, possibly due to a failed
- // tablet copy.
- return Status::OK();
- }
- RETURN_NOT_OK(s);
- cmeta_.swap(cmeta);
- return Status::OK();
-}
-
-Status TabletCopyClient::Start(const HostPort& copy_source_addr,
- scoped_refptr<TabletMetadata>* meta) {
- CHECK(!started_);
- start_time_micros_ = GetCurrentTimeMicros();
-
- Sockaddr addr;
- RETURN_NOT_OK(SockaddrFromHostPort(copy_source_addr, &addr));
- if (addr.IsWildcard()) {
- return Status::InvalidArgument("Invalid wildcard address to tablet copy from",
- Substitute("$0 (resolved to $1)",
- copy_source_addr.host(), addr.host()));
- }
- LOG_WITH_PREFIX(INFO) << "Beginning tablet copy session"
- << " from remote peer at address " << copy_source_addr.ToString();
-
- // Set up an RPC proxy for the TabletCopyService.
- proxy_.reset(new TabletCopyServiceProxy(messenger_, addr));
-
- BeginTabletCopySessionRequestPB req;
- req.set_requestor_uuid(fs_manager_->uuid());
- req.set_tablet_id(tablet_id_);
-
- rpc::RpcController controller;
- controller.set_timeout(MonoDelta::FromMilliseconds(
- FLAGS_tablet_copy_begin_session_timeout_ms));
-
- // Begin the tablet copy session with the remote peer.
- BeginTabletCopySessionResponsePB resp;
- RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
- controller,
- "Unable to begin tablet copy session");
- string copy_peer_uuid = resp.has_responder_uuid()
- ? resp.responder_uuid() : "(unknown uuid)";
- if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
- Status s = Status::IllegalState("Remote peer (" + copy_peer_uuid + ")" +
- " is currently copying itself!",
- resp.superblock().ShortDebugString());
- LOG_WITH_PREFIX(WARNING) << s.ToString();
- return s;
- }
-
- session_id_ = resp.session_id();
- session_idle_timeout_millis_ = resp.session_idle_timeout_millis();
- superblock_.reset(resp.release_superblock());
- superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING);
- wal_seqnos_.assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
- remote_committed_cstate_.reset(resp.release_initial_committed_cstate());
-
- Schema schema;
- RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
- "Cannot deserialize schema from remote superblock");
-
- if (replace_tombstoned_tablet_) {
- // Also validate the term of the source peer, in case they are
- // different. This is a sanity check that protects us in case a bug or
- // misconfiguration causes us to attempt to copy from an out-of-date
- // source peer, even after passing the term check from the caller in
- // SetTabletToReplace().
- int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
- if (last_logged_term > remote_committed_cstate_->current_term()) {
- return Status::InvalidArgument(
- Substitute("Tablet $0: source peer has term $1 but "
- "tombstoned replica has last-logged opid with higher term $2. "
- "Refusing tablet copy from source peer $3",
- tablet_id_,
- remote_committed_cstate_->current_term(),
- last_logged_term,
- copy_peer_uuid));
- }
-
- // This will flush to disk, but we set the data state to COPYING above.
- RETURN_NOT_OK_PREPEND(meta_->ReplaceSuperBlock(*superblock_),
- "Tablet Copy unable to replace superblock on tablet " +
- tablet_id_);
- } else {
-
- Partition partition;
- Partition::FromPB(superblock_->partition(), &partition);
- PartitionSchema partition_schema;
- RETURN_NOT_OK(PartitionSchema::FromPB(superblock_->partition_schema(),
- schema, &partition_schema));
-
- // Create the superblock on disk.
- RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_,
- superblock_->table_name(),
- superblock_->table_id(),
- schema,
- partition_schema,
- partition,
- tablet::TABLET_DATA_COPYING,
- &meta_));
- }
-
- started_ = true;
- if (meta) {
- *meta = meta_;
- }
- return Status::OK();
-}
-
-Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) {
- CHECK(started_);
- status_listener_ = status_listener;
-
- // Download all the files (serially, for now, but in parallel in the future).
- RETURN_NOT_OK(DownloadBlocks());
- RETURN_NOT_OK(DownloadWALs());
-
- return Status::OK();
-}
-
-Status TabletCopyClient::Finish() {
- CHECK(meta_);
- CHECK(started_);
- CHECK(downloaded_wal_);
- CHECK(downloaded_blocks_);
-
- RETURN_NOT_OK(WriteConsensusMetadata());
-
- // Replace tablet metadata superblock. This will set the tablet metadata state
- // to TABLET_DATA_READY, since we checked above that the response
- // superblock is in a valid state to bootstrap from.
- LOG_WITH_PREFIX(INFO) << "Tablet Copy complete. Replacing tablet superblock.";
- UpdateStatusMessage("Replacing tablet superblock");
- new_superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY);
- RETURN_NOT_OK(meta_->ReplaceSuperBlock(*new_superblock_));
-
- if (FLAGS_tablet_copy_save_downloaded_metadata) {
- string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
- string meta_copy_path = Substitute("$0.copy.$1.tmp", meta_path, start_time_micros_);
- RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path,
- WritableFileOptions()),
- "Unable to make copy of tablet metadata");
- }
-
- return Status::OK();
-}
-
-// Decode the remote error into a human-readable Status object.
-Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_error) {
- if (PREDICT_TRUE(remote_error.HasExtension(TabletCopyErrorPB::tablet_copy_error_ext))) {
- const TabletCopyErrorPB& error =
- remote_error.GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
- return StatusFromPB(error.status()).CloneAndPrepend("Received error code " +
- TabletCopyErrorPB::Code_Name(error.code()) + " from remote service");
- } else {
- return Status::InvalidArgument("Unable to decode tablet copy RPC error message",
- remote_error.ShortDebugString());
- }
-}
-
-// Enhance a RemoteError Status message with additional details from the remote.
-Status TabletCopyClient::UnwindRemoteError(const Status& status,
- const rpc::RpcController& controller) {
- if (!status.IsRemoteError()) {
- return status;
- }
- Status extension_status = ExtractRemoteError(*controller.error_response());
- return status.CloneAndAppend(extension_status.ToString());
-}
-
-void TabletCopyClient::UpdateStatusMessage(const string& message) {
- if (status_listener_ != nullptr) {
- status_listener_->StatusMessage("TabletCopy: " + message);
- }
-}
-
-Status TabletCopyClient::EndRemoteSession() {
- if (!started_) {
- return Status::OK();
- }
-
- rpc::RpcController controller;
- controller.set_timeout(MonoDelta::FromMilliseconds(
- FLAGS_tablet_copy_begin_session_timeout_ms));
-
- EndTabletCopySessionRequestPB req;
- req.set_session_id(session_id_);
- req.set_is_success(true);
- EndTabletCopySessionResponsePB resp;
- RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
- controller,
- "Failure ending tablet copy session");
-
- return Status::OK();
-}
-
-Status TabletCopyClient::DownloadWALs() {
- CHECK(started_);
-
- // Delete and recreate WAL dir if it already exists, to ensure stray files are
- // not kept from previous copies and runs.
- string path = fs_manager_->GetTabletWalDir(tablet_id_);
- if (fs_manager_->env()->FileExists(path)) {
- RETURN_NOT_OK(fs_manager_->env()->DeleteRecursively(path));
- }
- RETURN_NOT_OK(fs_manager_->env()->CreateDir(path));
- RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
-
- // Download the WAL segments.
- int num_segments = wal_seqnos_.size();
- LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
- uint64_t counter = 0;
- for (uint64_t seg_seqno : wal_seqnos_) {
- UpdateStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
- seg_seqno, counter + 1, num_segments));
- RETURN_NOT_OK(DownloadWAL(seg_seqno));
- ++counter;
- }
-
- downloaded_wal_ = true;
- return Status::OK();
-}
-
-Status TabletCopyClient::DownloadBlocks() {
- CHECK(started_);
-
- // Count up the total number of blocks to download.
- int num_blocks = 0;
- for (const RowSetDataPB& rowset : superblock_->rowsets()) {
- num_blocks += rowset.columns_size();
- num_blocks += rowset.redo_deltas_size();
- num_blocks += rowset.undo_deltas_size();
- if (rowset.has_bloom_block()) {
- num_blocks++;
- }
- if (rowset.has_adhoc_index_block()) {
- num_blocks++;
- }
- }
-
- // Download each block, writing the new block IDs into the new superblock
- // as each block downloads.
- gscoped_ptr<TabletSuperBlockPB> new_sb(new TabletSuperBlockPB());
- new_sb->CopyFrom(*superblock_);
- int block_count = 0;
- LOG_WITH_PREFIX(INFO) << "Starting download of " << num_blocks << " data blocks...";
- for (RowSetDataPB& rowset : *new_sb->mutable_rowsets()) {
- for (ColumnDataPB& col : *rowset.mutable_columns()) {
- RETURN_NOT_OK(DownloadAndRewriteBlock(col.mutable_block(),
- &block_count, num_blocks));
- }
- for (DeltaDataPB& redo : *rowset.mutable_redo_deltas()) {
- RETURN_NOT_OK(DownloadAndRewriteBlock(redo.mutable_block(),
- &block_count, num_blocks));
- }
- for (DeltaDataPB& undo : *rowset.mutable_undo_deltas()) {
- RETURN_NOT_OK(DownloadAndRewriteBlock(undo.mutable_block(),
- &block_count, num_blocks));
- }
- if (rowset.has_bloom_block()) {
- RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_bloom_block(),
- &block_count, num_blocks));
- }
- if (rowset.has_adhoc_index_block()) {
- RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_adhoc_index_block(),
- &block_count, num_blocks));
- }
- }
-
- // The orphaned physical block ids at the remote have no meaning to us.
- new_sb->clear_orphaned_blocks();
- new_superblock_.swap(new_sb);
-
- downloaded_blocks_ = true;
-
- return Status::OK();
-}
-
-Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
- VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::LOG_SEGMENT);
- data_id.set_wal_segment_seqno(wal_segment_seqno);
- string dest_path = fs_manager_->GetWalSegmentFileName(tablet_id_, wal_segment_seqno);
-
- WritableFileOptions opts;
- opts.sync_on_close = true;
- gscoped_ptr<WritableFile> writer;
- RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, &writer),
- "Unable to open file for writing");
- RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
- Substitute("Unable to download WAL segment with seq. number $0",
- wal_segment_seqno));
- return Status::OK();
-}
-
-Status TabletCopyClient::WriteConsensusMetadata() {
- // If we didn't find a previous consensus meta file, create one.
- if (!cmeta_) {
- gscoped_ptr<ConsensusMetadata> cmeta;
- return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
- remote_committed_cstate_->config(),
- remote_committed_cstate_->current_term(),
- &cmeta);
- }
-
- // Otherwise, update the consensus metadata to reflect the config and term
- // sent by the tablet copy source.
- cmeta_->MergeCommittedConsensusStatePB(*remote_committed_cstate_);
- RETURN_NOT_OK(cmeta_->Flush());
-
- if (FLAGS_tablet_copy_save_downloaded_metadata) {
- string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
- string cmeta_copy_path = Substitute("$0.copy.$1.tmp", cmeta_path, start_time_micros_);
- RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, cmeta_copy_path,
- WritableFileOptions()),
- "Unable to make copy of consensus metadata");
- }
-
- return Status::OK();
-}
-
-Status TabletCopyClient::DownloadAndRewriteBlock(BlockIdPB* block_id,
- int* block_count, int num_blocks) {
- BlockId old_block_id(BlockId::FromPB(*block_id));
- UpdateStatusMessage(Substitute("Downloading block $0 ($1/$2)",
- old_block_id.ToString(), *block_count,
- num_blocks));
- BlockId new_block_id;
- RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
- "Unable to download block with id " + old_block_id.ToString());
-
- new_block_id.CopyToPB(block_id);
- (*block_count)++;
- return Status::OK();
-}
-
-Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
- BlockId* new_block_id) {
- VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
-
- gscoped_ptr<WritableBlock> block;
- RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block),
- "Unable to create new block");
-
- DataIdPB data_id;
- data_id.set_type(DataIdPB::BLOCK);
- old_block_id.CopyToPB(data_id.mutable_block_id());
- RETURN_NOT_OK_PREPEND(DownloadFile(data_id, block.get()),
- Substitute("Unable to download block $0",
- old_block_id.ToString()));
-
- *new_block_id = block->id();
- RETURN_NOT_OK_PREPEND(block->Close(), "Unable to close block");
- return Status::OK();
-}
-
-template<class Appendable>
-Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
- Appendable* appendable) {
- uint64_t offset = 0;
- rpc::RpcController controller;
- controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
- FetchDataRequestPB req;
-
- bool done = false;
- while (!done) {
- controller.Reset();
- req.set_session_id(session_id_);
- req.mutable_data_id()->CopyFrom(data_id);
- req.set_offset(offset);
- req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
-
- FetchDataResponsePB resp;
- RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
- controller,
- "Unable to fetch data from remote");
-
- // Sanity-check for corruption.
- RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
- Substitute("Error validating data item $0", data_id.ShortDebugString()));
-
- // Write the data.
- RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
-
- if (PREDICT_FALSE(FLAGS_tablet_copy_dowload_file_inject_latency_ms > 0)) {
- LOG_WITH_PREFIX(INFO) << "Injecting latency into file download: " <<
- FLAGS_tablet_copy_dowload_file_inject_latency_ms;
- SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_dowload_file_inject_latency_ms));
- }
-
- if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
- done = true;
- }
- offset += resp.chunk().data().size();
- }
-
- return Status::OK();
-}
-
-Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
- // Verify the offset is what we expected.
- if (offset != chunk.offset()) {
- return Status::InvalidArgument("Offset did not match what was asked for",
- Substitute("$0 vs $1", offset, chunk.offset()));
- }
-
- // Verify the checksum.
- uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
- if (PREDICT_FALSE(crc32 != chunk.crc32())) {
- return Status::Corruption(
- Substitute("CRC32 does not match at offset $0 size $1: $2 vs $3",
- offset, chunk.data().size(), crc32, chunk.crc32()));
- }
- return Status::OK();
-}
-
-string TabletCopyClient::LogPrefix() {
- return Substitute("T $0 P $1: Tablet Copy client: ",
- tablet_id_, fs_manager_->uuid());
-}
-
-} // namespace tserver
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_client.h b/src/kudu/tserver/remote_bootstrap_client.h
deleted file mode 100644
index 01253ee..0000000
--- a/src/kudu/tserver/remote_bootstrap_client.h
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_CLIENT_H
-#define KUDU_TSERVER_TABLET_COPY_CLIENT_H
-
-#include <string>
-#include <memory>
-#include <vector>
-
-#include <gtest/gtest_prod.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-
-class BlockId;
-class BlockIdPB;
-class FsManager;
-class HostPort;
-
-namespace consensus {
-class ConsensusMetadata;
-class ConsensusStatePB;
-class RaftConfigPB;
-class RaftPeerPB;
-} // namespace consensus
-
-namespace rpc {
-class ErrorStatusPB;
-class Messenger;
-class RpcController;
-} // namespace rpc
-
-namespace tablet {
-class TabletMetadata;
-class TabletPeer;
-class TabletStatusListener;
-class TabletSuperBlockPB;
-} // namespace tablet
-
-namespace tserver {
-class DataIdPB;
-class DataChunkPB;
-class TabletCopyServiceProxy;
-
-// Client class for using tablet copy to copy a tablet from another host.
-// This class is not thread-safe.
-//
-// TODO:
-// * Parallelize download of blocks and WAL segments.
-//
-class TabletCopyClient {
- public:
-
- // Construct the tablet copy client.
- // 'fs_manager' and 'messenger' must remain valid until this object is destroyed.
- TabletCopyClient(std::string tablet_id, FsManager* fs_manager,
- std::shared_ptr<rpc::Messenger> messenger);
-
- // Attempt to clean up resources on the remote end by sending an
- // EndTabletCopySession() RPC
- ~TabletCopyClient();
-
- // Pass in the existing metadata for a tombstoned tablet, which will be
- // replaced if validation checks pass in Start().
- // 'meta' is the metadata for the tombstoned tablet and 'caller_term' is the
- // term provided by the caller (assumed to be the current leader of the
- // consensus config) for validation purposes.
- // If the consensus metadata exists on disk for this tablet, and if
- // 'caller_term' is lower than the current term stored in that consensus
- // metadata, then this method will fail with a Status::InvalidArgument error.
- Status SetTabletToReplace(const scoped_refptr<tablet::TabletMetadata>& meta,
- int64_t caller_term);
-
- // Start up a tablet copy session to bootstrap from the specified
- // bootstrap peer. Place a new superblock indicating that tablet copy is
- // in progress. If the 'metadata' pointer is passed as NULL, it is ignored,
- // otherwise the TabletMetadata object resulting from the initial remote
- // bootstrap response is returned.
- Status Start(const HostPort& copy_source_addr,
- scoped_refptr<tablet::TabletMetadata>* metadata);
-
- // Runs a "full" tablet copy, copying the physical layout of a tablet
- // from the leader of the specified consensus configuration.
- Status FetchAll(tablet::TabletStatusListener* status_listener);
-
- // After downloading all files successfully, write out the completed
- // replacement superblock.
- Status Finish();
-
- private:
- FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession);
- FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock);
- FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
- FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
- FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
-
- // Extract the embedded Status message from the given ErrorStatusPB.
- // The given ErrorStatusPB must extend TabletCopyErrorPB.
- static Status ExtractRemoteError(const rpc::ErrorStatusPB& remote_error);
-
- static Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller);
-
- // Update the bootstrap StatusListener with a message.
- // The string "TabletCopy: " will be prepended to each message.
- void UpdateStatusMessage(const std::string& message);
-
- // End the tablet copy session.
- Status EndRemoteSession();
-
- // Download all WAL files sequentially.
- Status DownloadWALs();
-
- // Download a single WAL file.
- // Assumes the WAL directories have already been created.
- // WAL file is opened with options so that it will fsync() on close.
- Status DownloadWAL(uint64_t wal_segment_seqno);
-
- // Write out the Consensus Metadata file based on the ConsensusStatePB
- // downloaded as part of initiating the tablet copy session.
- Status WriteConsensusMetadata();
-
- // Download all blocks belonging to a tablet sequentially.
- //
- // Blocks are given new IDs upon creation. On success, 'new_superblock_'
- // is populated to reflect the new block IDs and should be used in lieu
- // of 'superblock_' henceforth.
- Status DownloadBlocks();
-
- // Download the block specified by 'block_id'.
- //
- // On success:
- // - 'block_id' is set to the new ID of the downloaded block.
- // - 'block_count' is incremented.
- Status DownloadAndRewriteBlock(BlockIdPB* block_id, int* block_count, int num_blocks);
-
- // Download a single block.
- // Data block is opened with options so that it will fsync() on close.
- //
- // On success, 'new_block_id' is set to the new ID of the downloaded block.
- Status DownloadBlock(const BlockId& old_block_id, BlockId* new_block_id);
-
- // Download a single remote file. The block and WAL implementations delegate
- // to this method when downloading files.
- //
- // An Appendable is typically a WritableBlock (block) or WritableFile (WAL).
- //
- // Only used in one compilation unit, otherwise the implementation would
- // need to be in the header.
- template<class Appendable>
- Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
-
- Status VerifyData(uint64_t offset, const DataChunkPB& resp);
-
- // Return standard log prefix.
- std::string LogPrefix();
-
- // Set-once members.
- const std::string tablet_id_;
- FsManager* const fs_manager_;
- const std::shared_ptr<rpc::Messenger> messenger_;
-
- // State flags that enforce the progress of tablet copy.
- bool started_; // Session started.
- bool downloaded_wal_; // WAL segments downloaded.
- bool downloaded_blocks_; // Data blocks downloaded.
-
- // Session-specific data items.
- bool replace_tombstoned_tablet_;
-
- // Local tablet metadata file.
- scoped_refptr<tablet::TabletMetadata> meta_;
-
- // Local Consensus metadata file. This may initially be NULL if this is
- // bootstrapping a new replica (rather than replacing an old one).
- gscoped_ptr<consensus::ConsensusMetadata> cmeta_;
-
- tablet::TabletStatusListener* status_listener_;
- std::shared_ptr<TabletCopyServiceProxy> proxy_;
- std::string session_id_;
- uint64_t session_idle_timeout_millis_;
- gscoped_ptr<tablet::TabletSuperBlockPB> superblock_;
- gscoped_ptr<tablet::TabletSuperBlockPB> new_superblock_;
- gscoped_ptr<consensus::ConsensusStatePB> remote_committed_cstate_;
- std::vector<uint64_t> wal_seqnos_;
- int64_t start_time_micros_;
-
- DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
-};
-
-} // namespace tserver
-} // namespace kudu
-#endif /* KUDU_TSERVER_TABLET_COPY_CLIENT_H */
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service-test.cc b/src/kudu/tserver/remote_bootstrap_service-test.cc
deleted file mode 100644
index 1b5baf2..0000000
--- a/src/kudu/tserver/remote_bootstrap_service-test.cc
+++ /dev/null
@@ -1,491 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap-test-base.h"
-
-#include <gflags/gflags.h>
-#include <limits>
-#include <thread>
-#include <vector>
-
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/metadata.pb.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/transfer.h"
-#include "kudu/tserver/remote_bootstrap.pb.h"
-#include "kudu/tserver/tserver_service.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util.h"
-
-#define ASSERT_REMOTE_ERROR(status, err, code, str) \
- ASSERT_NO_FATAL_FAILURE(AssertRemoteError(status, err, code, str))
-
-DECLARE_uint64(tablet_copy_idle_timeout_ms);
-DECLARE_uint64(tablet_copy_timeout_poll_period_ms);
-
-namespace kudu {
-namespace tserver {
-
-using consensus::MaximumOpId;
-using consensus::MinimumOpId;
-using consensus::OpIdEquals;
-using env_util::ReadFully;
-using log::ReadableLogSegment;
-using rpc::ErrorStatusPB;
-using rpc::RpcController;
-using std::thread;
-using std::vector;
-
-class TabletCopyServiceTest : public TabletCopyTest {
- public:
- TabletCopyServiceTest() {
- // Poll for session expiration every 10 ms for the session timeout test.
- FLAGS_tablet_copy_timeout_poll_period_ms = 10;
- }
-
- protected:
- void SetUp() OVERRIDE {
- TabletCopyTest::SetUp();
- tablet_copy_proxy_.reset(
- new TabletCopyServiceProxy(client_messenger_, mini_server_->bound_rpc_addr()));
- }
-
- Status DoBeginTabletCopySession(const string& tablet_id,
- const string& requestor_uuid,
- BeginTabletCopySessionResponsePB* resp,
- RpcController* controller) {
- controller->set_timeout(MonoDelta::FromSeconds(1.0));
- BeginTabletCopySessionRequestPB req;
- req.set_tablet_id(tablet_id);
- req.set_requestor_uuid(requestor_uuid);
- return UnwindRemoteError(
- tablet_copy_proxy_->BeginTabletCopySession(req, resp, controller), controller);
- }
-
- Status DoBeginValidTabletCopySession(string* session_id,
- tablet::TabletSuperBlockPB* superblock = nullptr,
- uint64_t* idle_timeout_millis = nullptr,
- vector<uint64_t>* sequence_numbers = nullptr) {
- BeginTabletCopySessionResponsePB resp;
- RpcController controller;
- RETURN_NOT_OK(DoBeginTabletCopySession(GetTabletId(), GetLocalUUID(), &resp, &controller));
- *session_id = resp.session_id();
- if (superblock) {
- *superblock = resp.superblock();
- }
- if (idle_timeout_millis) {
- *idle_timeout_millis = resp.session_idle_timeout_millis();
- }
- if (sequence_numbers) {
- sequence_numbers->assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
- }
- return Status::OK();
- }
-
- Status DoCheckSessionActive(const string& session_id,
- CheckTabletCopySessionActiveResponsePB* resp,
- RpcController* controller) {
- controller->set_timeout(MonoDelta::FromSeconds(1.0));
- CheckTabletCopySessionActiveRequestPB req;
- req.set_session_id(session_id);
- return UnwindRemoteError(
- tablet_copy_proxy_->CheckSessionActive(req, resp, controller), controller);
- }
-
- Status DoFetchData(const string& session_id, const DataIdPB& data_id,
- uint64_t* offset, int64_t* max_length,
- FetchDataResponsePB* resp,
- RpcController* controller) {
- controller->set_timeout(MonoDelta::FromSeconds(1.0));
- FetchDataRequestPB req;
- req.set_session_id(session_id);
- req.mutable_data_id()->CopyFrom(data_id);
- if (offset) {
- req.set_offset(*offset);
- }
- if (max_length) {
- req.set_max_length(*max_length);
- }
- return UnwindRemoteError(
- tablet_copy_proxy_->FetchData(req, resp, controller), controller);
- }
-
- Status DoEndTabletCopySession(const string& session_id, bool is_success,
- const Status* error_msg,
- EndTabletCopySessionResponsePB* resp,
- RpcController* controller) {
- controller->set_timeout(MonoDelta::FromSeconds(1.0));
- EndTabletCopySessionRequestPB req;
- req.set_session_id(session_id);
- req.set_is_success(is_success);
- if (error_msg) {
- StatusToPB(*error_msg, req.mutable_error());
- }
- return UnwindRemoteError(
- tablet_copy_proxy_->EndTabletCopySession(req, resp, controller), controller);
- }
-
- // Decode the remote error into a Status object.
- Status ExtractRemoteError(const ErrorStatusPB* remote_error) {
- const TabletCopyErrorPB& error =
- remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
- return StatusFromPB(error.status());
- }
-
- // Enhance a RemoteError Status message with additional details from the remote.
- Status UnwindRemoteError(Status status, const RpcController* controller) {
- if (!status.IsRemoteError() ||
- controller->error_response()->code() != ErrorStatusPB::ERROR_APPLICATION) {
- return status;
- }
- Status remote_error = ExtractRemoteError(controller->error_response());
- return status.CloneAndPrepend(remote_error.ToString());
- }
-
- void AssertRemoteError(Status status, const ErrorStatusPB* remote_error,
- const TabletCopyErrorPB::Code app_code,
- const string& status_code_string) {
- ASSERT_TRUE(status.IsRemoteError()) << "Unexpected status code: " << status.ToString()
- << ", app code: "
- << TabletCopyErrorPB::Code_Name(app_code)
- << ", status code string: " << status_code_string;
- const Status app_status = ExtractRemoteError(remote_error);
- const TabletCopyErrorPB& error =
- remote_error->GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
- ASSERT_EQ(app_code, error.code()) << error.ShortDebugString();
- ASSERT_EQ(status_code_string, app_status.CodeAsString()) << app_status.ToString();
- LOG(INFO) << app_status.ToString();
- }
-
- // Return BlockId in format suitable for a FetchData() call.
- static DataIdPB AsDataTypeId(const BlockId& block_id) {
- DataIdPB data_id;
- data_id.set_type(DataIdPB::BLOCK);
- block_id.CopyToPB(data_id.mutable_block_id());
- return data_id;
- }
-
- gscoped_ptr<TabletCopyServiceProxy> tablet_copy_proxy_;
-};
-
-// Test beginning and ending a tablet copy session.
-TEST_F(TabletCopyServiceTest, TestSimpleBeginEndSession) {
- string session_id;
- tablet::TabletSuperBlockPB superblock;
- uint64_t idle_timeout_millis;
- vector<uint64_t> segment_seqnos;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
- &superblock,
- &idle_timeout_millis,
- &segment_seqnos));
- // Basic validation of returned params.
- ASSERT_FALSE(session_id.empty());
- ASSERT_EQ(FLAGS_tablet_copy_idle_timeout_ms, idle_timeout_millis);
- ASSERT_TRUE(superblock.IsInitialized());
- // We should have number of segments = number of rolls + 1 (due to the active segment).
- ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
-
- EndTabletCopySessionResponsePB resp;
- RpcController controller;
- ASSERT_OK(DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller));
-}
-
-// Test starting two sessions. The current implementation will silently only create one.
-TEST_F(TabletCopyServiceTest, TestBeginTwice) {
- // Second time through should silently succeed.
- for (int i = 0; i < 2; i++) {
- string session_id;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
- ASSERT_FALSE(session_id.empty());
- }
-}
-
-// Regression test for KUDU-1436: race conditions if multiple requests
-// to begin the same tablet copy session arrive at more or less the
-// same time.
-TEST_F(TabletCopyServiceTest, TestBeginConcurrently) {
- const int kNumThreads = 5;
- vector<thread> threads;
- vector<tablet::TabletSuperBlockPB> sblocks(kNumThreads);
- for (int i = 0 ; i < kNumThreads; i++) {
- threads.emplace_back([this, &sblocks, i]{
- string session_id;
- CHECK_OK(DoBeginValidTabletCopySession(&session_id, &sblocks[i]));
- CHECK(!session_id.empty());
- });
- }
- for (auto& t : threads) {
- t.join();
- }
- // Verify that all threads got the same result.
- for (int i = 1; i < threads.size(); i++) {
- ASSERT_EQ(sblocks[i].DebugString(), sblocks[0].DebugString());
- }
-}
-
-// Test bad session id error condition.
-TEST_F(TabletCopyServiceTest, TestInvalidSessionId) {
- vector<string> bad_session_ids;
- bad_session_ids.push_back("hodor");
- bad_session_ids.push_back(GetLocalUUID());
-
- // Fetch a block for a non-existent session.
- for (const string& session_id : bad_session_ids) {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::BLOCK);
- data_id.mutable_block_id()->set_id(1);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
- Status::NotFound("").CodeAsString());
- }
-
- // End a non-existent session.
- for (const string& session_id : bad_session_ids) {
- EndTabletCopySessionResponsePB resp;
- RpcController controller;
- Status status = DoEndTabletCopySession(session_id, true, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::NO_SESSION,
- Status::NotFound("").CodeAsString());
- }
-}
-
-// Test bad tablet id error condition.
-TEST_F(TabletCopyServiceTest, TestInvalidTabletId) {
- BeginTabletCopySessionResponsePB resp;
- RpcController controller;
- Status status =
- DoBeginTabletCopySession("some-unknown-tablet", GetLocalUUID(), &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(), TabletCopyErrorPB::TABLET_NOT_FOUND,
- Status::NotFound("").CodeAsString());
-}
-
-// Test DataIdPB validation.
-TEST_F(TabletCopyServiceTest, TestInvalidBlockOrOpId) {
- string session_id;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
-
- // Invalid BlockId.
- {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::BLOCK);
- data_id.mutable_block_id()->set_id(1);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(),
- TabletCopyErrorPB::BLOCK_NOT_FOUND,
- Status::NotFound("").CodeAsString());
- }
-
- // Invalid Segment Sequence Number for log fetch.
- {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::LOG_SEGMENT);
- data_id.set_wal_segment_seqno(31337);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(),
- TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND,
- Status::NotFound("").CodeAsString());
- }
-
- // Empty data type with BlockId.
- // The server will reject the request since we are missing the required 'type' field.
- {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.mutable_block_id()->set_id(1);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_TRUE(status.IsRemoteError()) << status.ToString();
- ASSERT_STR_CONTAINS(status.ToString(),
- "Invalid argument: invalid parameter for call "
- "kudu.tserver.TabletCopyService.FetchData: "
- "missing fields: data_id.type");
- }
-
- // Empty data type id (no BlockId, no Segment Sequence Number);
- {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::LOG_SEGMENT);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(),
- TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
- Status::InvalidArgument("").CodeAsString());
- }
-
- // Both BlockId and Segment Sequence Number in the same "union" PB (illegal).
- {
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::BLOCK);
- data_id.mutable_block_id()->set_id(1);
- data_id.set_wal_segment_seqno(0);
- Status status = DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(),
- TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
- Status::InvalidArgument("").CodeAsString());
- }
-}
-
-// Test invalid file offset error condition.
-TEST_F(TabletCopyServiceTest, TestFetchInvalidBlockOffset) {
- string session_id;
- tablet::TabletSuperBlockPB superblock;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
- FetchDataResponsePB resp;
- RpcController controller;
- // Impossible offset.
- uint64_t offset = std::numeric_limits<uint64_t>::max();
- Status status = DoFetchData(session_id, AsDataTypeId(FirstColumnBlockId(superblock)),
- &offset, nullptr, &resp, &controller);
- ASSERT_REMOTE_ERROR(status, controller.error_response(),
- TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST,
- Status::InvalidArgument("").CodeAsString());
-}
-
-// Test that we are able to fetch an entire block.
-TEST_F(TabletCopyServiceTest, TestFetchBlockAtOnce) {
- string session_id;
- tablet::TabletSuperBlockPB superblock;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
- // Local.
- BlockId block_id = FirstColumnBlockId(superblock);
- Slice local_data;
- faststring scratch;
- ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
- &scratch, &local_data));
-
- // Remote.
- FetchDataResponsePB resp;
- RpcController controller;
- ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id), nullptr, nullptr, &resp, &controller));
-
- AssertDataEqual(local_data.data(), local_data.size(), resp.chunk());
-}
-
-// Test that we are able to incrementally fetch blocks.
-TEST_F(TabletCopyServiceTest, TestFetchBlockIncrementally) {
- string session_id;
- tablet::TabletSuperBlockPB superblock;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
-
- BlockId block_id = FirstColumnBlockId(superblock);
- Slice local_data;
- faststring scratch;
- ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
- &scratch, &local_data));
-
- // Grab the remote data in several chunks.
- int64_t block_size = local_data.size();
- int64_t max_chunk_size = block_size / 5;
- uint64_t offset = 0;
- while (offset < block_size) {
- FetchDataResponsePB resp;
- RpcController controller;
- ASSERT_OK(DoFetchData(session_id, AsDataTypeId(block_id),
- &offset, &max_chunk_size, &resp, &controller));
- int64_t returned_bytes = resp.chunk().data().size();
- ASSERT_LE(returned_bytes, max_chunk_size);
- AssertDataEqual(local_data.data() + offset, returned_bytes, resp.chunk());
- offset += returned_bytes;
- }
-}
-
-// Test that we are able to fetch log segments.
-TEST_F(TabletCopyServiceTest, TestFetchLog) {
- string session_id;
- tablet::TabletSuperBlockPB superblock;
- uint64_t idle_timeout_millis;
- vector<uint64_t> segment_seqnos;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id,
- &superblock,
- &idle_timeout_millis,
- &segment_seqnos));
-
- ASSERT_EQ(kNumLogRolls + 1, segment_seqnos.size());
- uint64_t seg_seqno = *segment_seqnos.begin();
-
- // Fetch the remote data.
- FetchDataResponsePB resp;
- RpcController controller;
- DataIdPB data_id;
- data_id.set_type(DataIdPB::LOG_SEGMENT);
- data_id.set_wal_segment_seqno(seg_seqno);
- ASSERT_OK(DoFetchData(session_id, data_id, nullptr, nullptr, &resp, &controller));
-
- // Fetch the local data.
- log::SegmentSequence local_segments;
- ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments));
-
- uint64_t first_seg_seqno = (*local_segments.begin())->header().sequence_number();
-
-
- ASSERT_EQ(seg_seqno, first_seg_seqno)
- << "Expected equal sequence numbers: " << seg_seqno
- << " and " << first_seg_seqno;
- const scoped_refptr<ReadableLogSegment>& segment = local_segments[0];
- faststring scratch;
- int64_t size = segment->file_size();
- scratch.resize(size);
- Slice slice;
- ASSERT_OK(ReadFully(segment->readable_file().get(), 0, size, &slice, scratch.data()));
-
- AssertDataEqual(slice.data(), slice.size(), resp.chunk());
-}
-
-// Test that the tablet copy session timeout works properly.
-TEST_F(TabletCopyServiceTest, TestSessionTimeout) {
- // This flag should be seen by the service due to TSO.
- // We have also reduced the timeout polling frequency in SetUp().
- FLAGS_tablet_copy_idle_timeout_ms = 1; // Expire the session almost immediately.
-
- // Start session.
- string session_id;
- ASSERT_OK(DoBeginValidTabletCopySession(&session_id));
-
- MonoTime start_time = MonoTime::Now(MonoTime::FINE);
- CheckTabletCopySessionActiveResponsePB resp;
-
- do {
- RpcController controller;
- ASSERT_OK(DoCheckSessionActive(session_id, &resp, &controller));
- if (!resp.session_is_active()) {
- break;
- }
- SleepFor(MonoDelta::FromMilliseconds(1)); // 1 ms
- } while (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start_time).ToSeconds() < 10);
-
- ASSERT_FALSE(resp.session_is_active()) << "Tablet Copy session did not time out!";
-}
-
-} // namespace tserver
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service.cc b/src/kudu/tserver/remote_bootstrap_service.cc
deleted file mode 100644
index 1d7104c..0000000
--- a/src/kudu/tserver/remote_bootstrap_service.cc
+++ /dev/null
@@ -1,357 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/tserver/remote_bootstrap_service.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-#include <string>
-#include <vector>
-
-#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/log.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/tserver/remote_bootstrap_session.h"
-#include "kudu/tserver/tablet_peer_lookup.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/crc.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/flag_tags.h"
-
-// Note, this macro assumes the existence of a local var named 'context'.
-#define RPC_RETURN_APP_ERROR(app_err, message, s) \
- do { \
- SetupErrorAndRespond(context, app_err, message, s); \
- return; \
- } while (false)
-
-#define RPC_RETURN_NOT_OK(expr, app_err, message) \
- do { \
- Status s = (expr); \
- if (!s.ok()) { \
- RPC_RETURN_APP_ERROR(app_err, message, s); \
- } \
- } while (false)
-
-DEFINE_uint64(tablet_copy_idle_timeout_ms, 180000,
- "Amount of time without activity before a tablet copy "
- "session will expire, in millis");
-TAG_FLAG(tablet_copy_idle_timeout_ms, hidden);
-
-DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
- "How often the tablet_copy service polls for expired "
- "tablet copy sessions, in millis");
-TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);
-
-DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
- "Fraction of the time when the tablet will crash while "
- "servicing a TabletCopyService FetchData() RPC call. "
- "(For testing only!)");
-TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
-
-namespace kudu {
-namespace tserver {
-
-using crc::Crc32c;
-using strings::Substitute;
-using tablet::TabletPeer;
-
-static void SetupErrorAndRespond(rpc::RpcContext* context,
- TabletCopyErrorPB::Code code,
- const string& message,
- const Status& s) {
- LOG(WARNING) << "Error handling TabletCopyService RPC request from "
- << context->requestor_string() << ": "
- << s.ToString();
- TabletCopyErrorPB error;
- StatusToPB(s, error.mutable_status());
- error.set_code(code);
- context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
- message, error);
-}
-
-TabletCopyServiceImpl::TabletCopyServiceImpl(
- FsManager* fs_manager,
- TabletPeerLookupIf* tablet_peer_lookup,
- const scoped_refptr<MetricEntity>& metric_entity,
- const scoped_refptr<rpc::ResultTracker>& result_tracker)
- : TabletCopyServiceIf(metric_entity, result_tracker),
- fs_manager_(CHECK_NOTNULL(fs_manager)),
- tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
- shutdown_latch_(1) {
- CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
- &TabletCopyServiceImpl::EndExpiredSessions, this,
- &session_expiration_thread_));
-}
-
-void TabletCopyServiceImpl::BeginTabletCopySession(
- const BeginTabletCopySessionRequestPB* req,
- BeginTabletCopySessionResponsePB* resp,
- rpc::RpcContext* context) {
- const string& requestor_uuid = req->requestor_uuid();
- const string& tablet_id = req->tablet_id();
-
- // For now, we use the requestor_uuid with the tablet id as the session id,
- // but there is no guarantee this will not change in the future.
- const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);
-
- scoped_refptr<TabletPeer> tablet_peer;
- RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
- TabletCopyErrorPB::TABLET_NOT_FOUND,
- Substitute("Unable to find specified tablet: $0", tablet_id));
-
- scoped_refptr<TabletCopySession> session;
- {
- MutexLock l(sessions_lock_);
- if (!FindCopy(sessions_, session_id, &session)) {
- LOG(INFO) << Substitute(
- "Beginning new tablet copy session on tablet $0 from peer $1"
- " at $2: session id = $3",
- tablet_id, requestor_uuid, context->requestor_string(), session_id);
- session.reset(new TabletCopySession(tablet_peer, session_id,
- requestor_uuid, fs_manager_));
- RPC_RETURN_NOT_OK(session->Init(),
- TabletCopyErrorPB::UNKNOWN_ERROR,
- Substitute("Error initializing tablet copy session for tablet $0",
- tablet_id));
- InsertOrDie(&sessions_, session_id, session);
- } else {
- LOG(INFO) << Substitute(
- "Re-sending initialization info for existing tablet copy session on tablet $0"
- " from peer $1 at $2: session_id = $3",
- tablet_id, requestor_uuid, context->requestor_string(), session_id);
- }
- ResetSessionExpirationUnlocked(session_id);
- }
-
- resp->set_responder_uuid(fs_manager_->uuid());
- resp->set_session_id(session_id);
- resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_ms);
- resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
- resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
-
- for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
- resp->add_wal_segment_seqnos(segment->header().sequence_number());
- }
-
- context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::CheckSessionActive(
- const CheckTabletCopySessionActiveRequestPB* req,
- CheckTabletCopySessionActiveResponsePB* resp,
- rpc::RpcContext* context) {
- const string& session_id = req->session_id();
-
- // Look up and validate tablet copy session.
- scoped_refptr<TabletCopySession> session;
- MutexLock l(sessions_lock_);
- TabletCopyErrorPB::Code app_error;
- Status status = FindSessionUnlocked(session_id, &app_error, &session);
- if (status.ok()) {
- if (req->keepalive()) {
- ResetSessionExpirationUnlocked(session_id);
- }
- resp->set_session_is_active(true);
- context->RespondSuccess();
- return;
- } else if (app_error == TabletCopyErrorPB::NO_SESSION) {
- resp->set_session_is_active(false);
- context->RespondSuccess();
- return;
- } else {
- RPC_RETURN_NOT_OK(status, app_error,
- Substitute("Error trying to check whether session $0 is active", session_id));
- }
-}
-
-void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
- FetchDataResponsePB* resp,
- rpc::RpcContext* context) {
- const string& session_id = req->session_id();
-
- // Look up and validate tablet copy session.
- scoped_refptr<TabletCopySession> session;
- {
- MutexLock l(sessions_lock_);
- TabletCopyErrorPB::Code app_error;
- RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
- app_error, "No such session");
- ResetSessionExpirationUnlocked(session_id);
- }
-
- MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
-
- uint64_t offset = req->offset();
- int64_t client_maxlen = req->max_length();
-
- const DataIdPB& data_id = req->data_id();
- TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
- RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
- error_code, "Invalid DataId");
-
- DataChunkPB* data_chunk = resp->mutable_chunk();
- string* data = data_chunk->mutable_data();
- int64_t total_data_length = 0;
- if (data_id.type() == DataIdPB::BLOCK) {
- // Fetching a data block chunk.
- const BlockId& block_id = BlockId::FromPB(data_id.block_id());
- RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
- data, &total_data_length, &error_code),
- error_code, "Unable to get piece of data block");
- } else {
- // Fetching a log segment chunk.
- uint64_t segment_seqno = data_id.wal_segment_seqno();
- RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
- data, &total_data_length, &error_code),
- error_code, "Unable to get piece of log segment");
- }
-
- data_chunk->set_total_data_length(total_data_length);
- data_chunk->set_offset(offset);
-
- // Calculate checksum.
- uint32_t crc32 = Crc32c(data->data(), data->length());
- data_chunk->set_crc32(crc32);
-
- context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::EndTabletCopySession(
- const EndTabletCopySessionRequestPB* req,
- EndTabletCopySessionResponsePB* resp,
- rpc::RpcContext* context) {
- {
- MutexLock l(sessions_lock_);
- TabletCopyErrorPB::Code app_error;
- LOG(INFO) << "Request end of tablet copy session " << req->session_id()
- << " received from " << context->requestor_string();
- RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
- app_error, "No such session");
- }
- context->RespondSuccess();
-}
-
-void TabletCopyServiceImpl::Shutdown() {
- shutdown_latch_.CountDown();
- session_expiration_thread_->Join();
-
- // Destroy all tablet copy sessions.
- vector<string> session_ids;
- for (const MonoTimeMap::value_type& entry : session_expirations_) {
- session_ids.push_back(entry.first);
- }
- for (const string& session_id : session_ids) {
- LOG(INFO) << "Destroying tablet copy session " << session_id << " due to service shutdown";
- TabletCopyErrorPB::Code app_error;
- CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
- }
-}
-
-Status TabletCopyServiceImpl::FindSessionUnlocked(
- const string& session_id,
- TabletCopyErrorPB::Code* app_error,
- scoped_refptr<TabletCopySession>* session) const {
- if (!FindCopy(sessions_, session_id, session)) {
- *app_error = TabletCopyErrorPB::NO_SESSION;
- return Status::NotFound(
- Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
- }
- return Status::OK();
-}
-
-Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
- const DataIdPB& data_id,
- TabletCopyErrorPB::Code* app_error,
- const scoped_refptr<TabletCopySession>& session) const {
- if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
- *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
- return Status::InvalidArgument(
- Substitute("Only one of BlockId or segment sequence number are required, "
- "but both were specified. DataTypeID: $0", data_id.ShortDebugString()));
- } else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
- *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
- return Status::InvalidArgument(
- Substitute("Only one of BlockId or segment sequence number are required, "
- "but neither were specified. DataTypeID: $0", data_id.ShortDebugString()));
- }
-
- if (data_id.type() == DataIdPB::BLOCK) {
- if (PREDICT_FALSE(!data_id.has_block_id())) {
- return Status::InvalidArgument("block_id must be specified for type == BLOCK",
- data_id.ShortDebugString());
- }
- } else {
- if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
- return Status::InvalidArgument(
- "segment sequence number must be specified for type == LOG_SEGMENT",
- data_id.ShortDebugString());
- }
- }
-
- return Status::OK();
-}
-
-void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
- MonoTime expiration(MonoTime::Now(MonoTime::FINE));
- expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
- InsertOrUpdate(&session_expirations_, session_id, expiration);
-}
-
-Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
- const std::string& session_id,
- TabletCopyErrorPB::Code* app_error) {
- scoped_refptr<TabletCopySession> session;
- RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
- // Remove the session from the map.
- // It will get destroyed once there are no outstanding refs.
- LOG(INFO) << "Ending tablet copy session " << session_id << " on tablet "
- << session->tablet_id() << " with peer " << session->requestor_uuid();
- CHECK_EQ(1, sessions_.erase(session_id));
- CHECK_EQ(1, session_expirations_.erase(session_id));
-
- return Status::OK();
-}
-
-void TabletCopyServiceImpl::EndExpiredSessions() {
- do {
- MutexLock l(sessions_lock_);
- MonoTime now = MonoTime::Now(MonoTime::FINE);
-
- vector<string> expired_session_ids;
- for (const MonoTimeMap::value_type& entry : session_expirations_) {
- const string& session_id = entry.first;
- const MonoTime& expiration = entry.second;
- if (expiration.ComesBefore(now)) {
- expired_session_ids.push_back(session_id);
- }
- }
- for (const string& session_id : expired_session_ids) {
- LOG(INFO) << "Tablet Copy session " << session_id
- << " has expired. Terminating session.";
- TabletCopyErrorPB::Code app_error;
- CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
- }
- } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
- FLAGS_tablet_copy_timeout_poll_period_ms)));
-}
-
-} // namespace tserver
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/647f904b/src/kudu/tserver/remote_bootstrap_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_service.h b/src/kudu/tserver/remote_bootstrap_service.h
deleted file mode 100644
index a5eab8c..0000000
--- a/src/kudu/tserver/remote_bootstrap_service.h
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TSERVER_TABLET_COPY_SERVICE_H_
-#define KUDU_TSERVER_TABLET_COPY_SERVICE_H_
-
-#include <string>
-#include <unordered_map>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/tserver/remote_bootstrap.service.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-namespace kudu {
-class FsManager;
-
-namespace log {
-class ReadableLogSegment;
-} // namespace log
-
-namespace tserver {
-
-class TabletCopySession;
-class TabletPeerLookupIf;
-
-class TabletCopyServiceImpl : public TabletCopyServiceIf {
- public:
- TabletCopyServiceImpl(FsManager* fs_manager,
- TabletPeerLookupIf* tablet_peer_lookup,
- const scoped_refptr<MetricEntity>& metric_entity,
- const scoped_refptr<rpc::ResultTracker>& result_tracker);
-
- virtual void BeginTabletCopySession(const BeginTabletCopySessionRequestPB* req,
- BeginTabletCopySessionResponsePB* resp,
- rpc::RpcContext* context) OVERRIDE;
-
- virtual void CheckSessionActive(const CheckTabletCopySessionActiveRequestPB* req,
- CheckTabletCopySessionActiveResponsePB* resp,
- rpc::RpcContext* context) OVERRIDE;
-
- virtual void FetchData(const FetchDataRequestPB* req,
- FetchDataResponsePB* resp,
- rpc::RpcContext* context) OVERRIDE;
-
- virtual void EndTabletCopySession(const EndTabletCopySessionRequestPB* req,
- EndTabletCopySessionResponsePB* resp,
- rpc::RpcContext* context) OVERRIDE;
-
- virtual void Shutdown() OVERRIDE;
-
- private:
- typedef std::unordered_map<std::string, scoped_refptr<TabletCopySession> > SessionMap;
- typedef std::unordered_map<std::string, MonoTime> MonoTimeMap;
-
- // Look up session in session map.
- Status FindSessionUnlocked(const std::string& session_id,
- TabletCopyErrorPB::Code* app_error,
- scoped_refptr<TabletCopySession>* session) const;
-
- // Validate the data identifier in a FetchData request.
- Status ValidateFetchRequestDataId(const DataIdPB& data_id,
- TabletCopyErrorPB::Code* app_error,
- const scoped_refptr<TabletCopySession>& session) const;
-
- // Take note of session activity; Re-update the session timeout deadline.
- void ResetSessionExpirationUnlocked(const std::string& session_id);
-
- // Destroy the specified tablet copy session.
- Status DoEndTabletCopySessionUnlocked(const std::string& session_id,
- TabletCopyErrorPB::Code* app_error);
-
- // The timeout thread periodically checks whether sessions are expired and
- // removes them from the map.
- void EndExpiredSessions();
-
- FsManager* fs_manager_;
- TabletPeerLookupIf* tablet_peer_lookup_;
-
- // Protects sessions_ and session_expirations_ maps.
- mutable Mutex sessions_lock_;
- SessionMap sessions_;
- MonoTimeMap session_expirations_;
-
- // Session expiration thread.
- // TODO: this is a hack, replace with some kind of timer impl. See KUDU-286.
- CountDownLatch shutdown_latch_;
- scoped_refptr<Thread> session_expiration_thread_;
-};
-
-} // namespace tserver
-} // namespace kudu
-
-#endif // KUDU_TSERVER_TABLET_COPY_SERVICE_H_