You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/08/24 04:43:57 UTC
[1/2] kudu git commit: KUDU-871. Support tombstoned voting
Repository: kudu
Updated Branches:
refs/heads/master b37bde72c -> 5bca7d8ba
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index 650ac15..9c34638 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -250,6 +250,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
RowSetMetadata *GetRowSetForTests(int64_t id);
+ // Return standard "T xxx P yyy" log prefix.
+ std::string LogPrefix() const;
+
private:
friend class RefCountedThreadSafe<TabletMetadata>;
friend class MetadataTest;
@@ -303,9 +306,6 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
// Failures are logged, but are not fatal.
void DeleteOrphanedBlocks(const std::vector<BlockId>& blocks);
- // Return standard "T xxx P yyy" log prefix.
- std::string LogPrefix() const;
-
enum State {
kNotLoadedYet,
kNotWrittenYet,
@@ -355,6 +355,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
// Record of the last opid logged by the tablet before it was last
// tombstoned. Has no meaning for non-tombstoned tablets.
+ // Protected by 'data_lock_'.
boost::optional<consensus::OpId> tombstone_last_logged_opid_;
// If this counter is > 0 then Flush() will not write any data to
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index b358b59..df07384 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -125,38 +125,37 @@ class TabletReplicaTest : public KuduTabletTest {
metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
- RaftPeerPB config_peer;
- config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
- config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
- config_peer.mutable_last_known_addr()->set_port(0);
- config_peer.set_member_type(RaftPeerPB::VOTER);
+ RaftConfigPB config;
+ config.set_opid_index(consensus::kInvalidOpIdIndex);
+
+ RaftPeerPB* config_peer = config.add_peers();
+ config_peer->set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
+ config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
+ config_peer->mutable_last_known_addr()->set_port(0);
+ config_peer->set_member_type(RaftPeerPB::VOTER);
scoped_refptr<ConsensusMetadataManager> cmeta_manager(
new ConsensusMetadataManager(tablet()->metadata()->fs_manager()));
+ scoped_refptr<ConsensusMetadata> cmeta;
+ ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
+ &cmeta));
+
// "Bootstrap" and start the TabletReplica.
tablet_replica_.reset(
new TabletReplica(tablet()->shared_metadata(),
cmeta_manager,
- config_peer,
+ *config_peer,
apply_pool_.get(),
Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
Unretained(this),
tablet()->tablet_id())));
-
+ ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
// Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
// TODO(mpercy): Refactor TabletHarness to allow taking a
// LogAnchorRegistry, while also providing TabletMetadata for consumption
// by TabletReplica before Tablet is instantiated.
tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
-
- RaftConfigPB config;
- config.add_peers()->CopyFrom(config_peer);
- config.set_opid_index(consensus::kInvalidOpIdIndex);
-
- scoped_refptr<ConsensusMetadata> cmeta;
- ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
- &cmeta));
}
Status StartReplica(const ConsensusBootstrapInfo& info) {
@@ -171,7 +170,6 @@ class TabletReplicaTest : public KuduTabletTest {
messenger_,
scoped_refptr<rpc::ResultTracker>(),
log,
- raft_pool_.get(),
prepare_pool_.get());
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index c1a0477..7f5a72e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -115,7 +115,7 @@ TabletReplica::TabletReplica(
log_anchor_registry_(new LogAnchorRegistry()),
apply_pool_(apply_pool),
mark_dirty_clbk_(std::move(mark_dirty_clbk)),
- state_(NOT_STARTED),
+ state_(NOT_INITIALIZED),
last_status_("Tablet initializing...") {
}
@@ -127,13 +127,28 @@ TabletReplica::~TabletReplica() {
<< TabletStatePB_Name(state_);
}
+Status TabletReplica::Init(ThreadPool* raft_pool) {
+ CHECK_EQ(NOT_INITIALIZED, state_);
+ TRACE("Creating consensus instance");
+ ConsensusOptions options;
+ options.tablet_id = meta_->tablet_id();
+ shared_ptr<RaftConsensus> consensus;
+ RETURN_NOT_OK(RaftConsensus::Create(std::move(options),
+ local_peer_pb_,
+ cmeta_manager_,
+ raft_pool,
+ &consensus));
+ consensus_ = std::move(consensus);
+ set_state(INITIALIZED);
+ return Status::OK();
+}
+
Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
shared_ptr<Tablet> tablet,
scoped_refptr<clock::Clock> clock,
shared_ptr<Messenger> messenger,
scoped_refptr<ResultTracker> result_tracker,
scoped_refptr<Log> log,
- ThreadPool* raft_pool,
ThreadPool* prepare_pool) {
DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
DCHECK(log) << "A TabletReplica must be provided with a Log";
@@ -141,9 +156,11 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
{
std::lock_guard<simple_spinlock> state_change_guard(state_change_lock_);
+ scoped_refptr<MetricEntity> metric_entity;
+ gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
+ scoped_refptr<TimeManager> time_manager;
{
std::lock_guard<simple_spinlock> l(lock_);
-
CHECK_EQ(BOOTSTRAPPING, state_);
tablet_ = DCHECK_NOTNULL(std::move(tablet));
@@ -151,24 +168,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
messenger_ = DCHECK_NOTNULL(std::move(messenger));
result_tracker_ = std::move(result_tracker); // Passed null in tablet_replica-test
log_ = DCHECK_NOTNULL(log); // Not moved because it's passed to RaftConsensus::Start() below.
- }
-
- // Unlock while we initialize RaftConsensus, which involves I/O.
- TRACE("Creating consensus instance");
- ConsensusOptions options;
- options.tablet_id = meta_->tablet_id();
- shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
- local_peer_pb_,
- cmeta_manager_,
- raft_pool));
- RETURN_NOT_OK(consensus->Init());
- scoped_refptr<MetricEntity> metric_entity;
- gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
- scoped_refptr<TimeManager> time_manager;
- {
- std::lock_guard<simple_spinlock> l(lock_);
- consensus_ = consensus;
metric_entity = tablet_->GetMetricEntity();
prepare_pool_token_ = prepare_pool->NewTokenWithMetrics(
ThreadPool::ExecutionMode::SERIAL,
@@ -196,7 +196,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
// may invoke TabletReplica::StartReplicaTransaction() during startup, causing
// a self-deadlock. We take a ref to members protected by 'lock_' before
// unlocking.
- RETURN_NOT_OK(consensus->Start(
+ RETURN_NOT_OK(consensus_->Start(
bootstrap_info,
std::move(peer_proxy_factory),
log,
@@ -208,7 +208,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
// Re-acquire 'lock_' to update our state variable.
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 'state_change_lock_'.
- state_ = RUNNING;
+ set_state(RUNNING);
}
// Because we changed the tablet state, we need to re-report the tablet to the master.
@@ -222,22 +222,17 @@ const consensus::RaftConfigPB TabletReplica::RaftConfig() const {
return consensus_->CommittedConfig();
}
-void TabletReplica::Shutdown() {
-
- LOG(INFO) << "Initiating TabletReplica shutdown for tablet: " << tablet_id_;
-
- TabletStatePB shutdown_type = SHUTDOWN;
+void TabletReplica::Stop() {
{
std::unique_lock<simple_spinlock> lock(lock_);
- if (state_ == QUIESCING || state_ == SHUTDOWN || state_ == FAILED) {
+ if (state_ == STOPPING || state_ == STOPPED ||
+ state_ == SHUTDOWN || state_ == FAILED) {
lock.unlock();
- WaitUntilShutdown();
+ WaitUntilStopped();
return;
}
- if (!error_.ok()) {
- shutdown_type = FAILED;
- }
- state_ = QUIESCING;
+ LOG_WITH_PREFIX(INFO) << "stopping tablet replica";
+ set_state(STOPPING);
}
std::lock_guard<simple_spinlock> l(state_change_lock_);
@@ -248,7 +243,7 @@ void TabletReplica::Shutdown() {
if (tablet_) tablet_->UnregisterMaintenanceOps();
UnregisterMaintenanceOps();
- if (consensus_) consensus_->Shutdown();
+ if (consensus_) consensus_->Stop();
// TODO(KUDU-183): Keep track of the pending tasks and send an "abort" message.
LOG_SLOW_EXECUTION(WARNING, 1000,
@@ -272,21 +267,31 @@ void TabletReplica::Shutdown() {
tablet_->Shutdown();
}
- // Only update the replica state when all other components have shut down.
+ // Only mark the peer as STOPPED when all other components have shut down.
{
std::lock_guard<simple_spinlock> lock(lock_);
- // Release mem tracker resources.
- consensus_.reset();
tablet_.reset();
- state_ = shutdown_type;
+ set_state(STOPPED);
+ }
+}
+
+void TabletReplica::Shutdown() {
+ Stop();
+ if (consensus_) consensus_->Shutdown();
+ std::lock_guard<simple_spinlock> lock(lock_);
+ if (state_ == SHUTDOWN || state_ == FAILED) return;
+ if (!error_.ok()) {
+ set_state(FAILED);
+ return;
}
+ set_state(SHUTDOWN);
}
-void TabletReplica::WaitUntilShutdown() {
+void TabletReplica::WaitUntilStopped() {
while (true) {
{
std::lock_guard<simple_spinlock> lock(lock_);
- if (state_ == SHUTDOWN || state_ == FAILED) {
+ if (state_ == STOPPED || state_ == SHUTDOWN || state_ == FAILED) {
return;
}
}
@@ -294,6 +299,41 @@ void TabletReplica::WaitUntilShutdown() {
}
}
+string TabletReplica::LogPrefix() const {
+ return meta_->LogPrefix();
+}
+
+void TabletReplica::set_state(TabletStatePB new_state) {
+ switch (new_state) {
+ case NOT_INITIALIZED:
+ LOG(FATAL) << "Cannot transition to NOT_INITIALIZED state";
+ return;
+ case INITIALIZED:
+ CHECK_EQ(NOT_INITIALIZED, state_);
+ break;
+ case BOOTSTRAPPING:
+ CHECK_EQ(INITIALIZED, state_);
+ break;
+ case RUNNING:
+ CHECK_EQ(BOOTSTRAPPING, state_);
+ break;
+ case STOPPING:
+ CHECK_NE(STOPPED, state_);
+ CHECK_NE(SHUTDOWN, state_);
+ break;
+ case STOPPED:
+ CHECK_EQ(STOPPING, state_);
+ break;
+ case SHUTDOWN: FALLTHROUGH_INTENDED;
+ case FAILED:
+ CHECK_EQ(STOPPED, state_) << TabletStatePB_Name(state_);
+ break;
+ default:
+ break;
+ }
+ state_ = new_state;
+}
+
Status TabletReplica::CheckRunning() const {
{
std::lock_guard<simple_spinlock> lock(lock_);
@@ -320,7 +360,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
has_consensus = true; // consensus_ is a set-once object.
}
}
- if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
+ if (cached_state == STOPPING || cached_state == STOPPED) {
return Status::IllegalState(
Substitute("The tablet is already shutting down or shutdown. State: $0",
TabletStatePB_Name(cached_state)));
@@ -390,6 +430,11 @@ Status TabletReplica::RunLogGC() {
return Status::OK();
}
+void TabletReplica::SetBootstrapping() {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ set_state(BOOTSTRAPPING);
+}
+
void TabletReplica::SetStatusMessage(const std::string& status) {
std::lock_guard<simple_spinlock> lock(lock_);
last_status_ = status;
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index fd272ad..8b85889 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -93,6 +93,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
ThreadPool* apply_pool,
Callback<void(const std::string& reason)> mark_dirty_clbk);
+ // Initializes RaftConsensus.
+ // This must be called before publishing the instance to other threads.
+ // If this fails, the TabletReplica instance remains in a NOT_INITIALIZED
+ // state.
+ Status Init(ThreadPool* raft_pool);
+
// Starts the TabletReplica, making it available for Write()s. If this
// TabletReplica is part of a consensus configuration this will connect it to other replicas
// in the consensus configuration.
@@ -102,14 +108,19 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
std::shared_ptr<rpc::Messenger> messenger,
scoped_refptr<rpc::ResultTracker> result_tracker,
scoped_refptr<log::Log> log,
- ThreadPool* raft_pool,
ThreadPool* prepare_pool);
- // Shutdown this tablet replica. If a shutdown is already in progress,
- // blocks until that shutdown is complete.
+ // Synchronously transition this replica to STOPPED state from any other
+ // state. This also stops RaftConsensus. If a Stop() operation is already in
+ // progress, blocks until that operation is complete.
+ // See tablet/metadata.proto for a description of legal state transitions.
+ void Stop();
+
+ // Synchronously transition this replica to SHUTDOWN state from any other state.
+ // See tablet/metadata.proto for a description of legal state transitions.
//
// If 'error_' has been set to a non-OK status, the final state will be
- // FAILED, and SHUTDOWN otherwise.
+ // FAILED instead of SHUTDOWN.
void Shutdown();
// Check that the tablet is in a RUNNING state.
@@ -176,17 +187,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
// Returns the current Raft configuration.
const consensus::RaftConfigPB RaftConfig() const;
- // If any peers in the consensus configuration lack permanent uuids, get them via an
- // RPC call and update.
- // TODO: move this to raft_consensus.h.
- Status UpdatePermanentUuids();
-
// Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
- void SetBootstrapping() {
- std::lock_guard<simple_spinlock> lock(lock_);
- CHECK_EQ(NOT_STARTED, state_);
- state_ = BOOTSTRAPPING;
- }
+ void SetBootstrapping();
// Set a user-readable status message about the tablet. This may appear on
// the Web UI, for example.
@@ -287,14 +289,15 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
~TabletReplica();
- // Wait until the TabletReplica is fully in a FAILED or SHUTDOWN state.
- void WaitUntilShutdown();
+ // Wait until the TabletReplica is fully in STOPPED, SHUTDOWN, or FAILED
+ // state.
+ void WaitUntilStopped();
- // After bootstrap is complete and consensus is setup this initiates the transactions
- // that were not complete on bootstrap.
- // Not implemented yet. See .cc file.
- Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
- const consensus::ConsensusBootstrapInfo& bootstrap_info);
+ std::string LogPrefix() const;
+ // Transition to another state. Requires that the caller hold 'lock_' if the
+ // object has already published to other threads. See tablet/metadata.proto
+ // for state descriptions and legal state transitions.
+ void set_state(TabletStatePB new_state);
const scoped_refptr<TabletMetadata> meta_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index f2b6e5b..fbb1be4 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -377,8 +377,8 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
// we should see the tablet in TOMBSTONED state on these servers.
ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
- ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
- ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
+ ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout));
+ ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout));
}
// Test unsafe config change when there is one leader survivor in the cluster.
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tools/kudu-ts-cli-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ts-cli-test.cc b/src/kudu/tools/kudu-ts-cli-test.cc
index 7ae58ee..91163aa 100644
--- a/src/kudu/tools/kudu-ts-cli-test.cc
+++ b/src/kudu/tools/kudu-ts-cli-test.cc
@@ -86,7 +86,7 @@ TEST_F(KuduTsCliTest, TestDeleteTablet) {
ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(0, tablet_id, { tablet::TABLET_DATA_TOMBSTONED }));
TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
- ASSERT_OK(itest::WaitUntilTabletInState(ts, tablet_id, tablet::SHUTDOWN, timeout));
+ ASSERT_OK(itest::WaitUntilTabletInState(ts, tablet_id, tablet::STOPPED, timeout));
}
// Test dumping a tablet using kudu-ts-cli tool.
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index a75750c..07b20ff 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -295,6 +295,10 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
CHECK(fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_,
superblock_->mutable_data_dir_group()));
+ // Create the ConsensusMetadata before returning from Start() so that it's
+ // possible to vote while we are copying the replica for the first time.
+ RETURN_NOT_OK(WriteConsensusMetadata());
+
state_ = kStarted;
if (meta) {
*meta = meta_;
@@ -319,8 +323,6 @@ Status TabletCopyClient::Finish() {
CHECK_EQ(kStarted, state_);
state_ = kFinished;
- RETURN_NOT_OK(WriteConsensusMetadata());
-
// Replace tablet metadata superblock. This will set the tablet metadata state
// to TABLET_DATA_READY, since we checked above that the response
// superblock is in a valid state to bootstrap from.
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 7262ae5..429c05b 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -205,7 +205,8 @@ class TabletCopyClient {
scoped_refptr<tablet::TabletMetadata> meta_;
// Local Consensus metadata file. This may initially be NULL if this is
- // bootstrapping a new replica (rather than replacing an old one).
+ // bootstrapping a new replica (rather than replacing an old one) but it is
+ // guaranteed to be set after a successful call to Start().
scoped_refptr<consensus::ConsensusMetadata> cmeta_;
scoped_refptr<tablet::TabletReplica> tablet_replica_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 05cb6e4..85d9340 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -119,14 +119,14 @@ class TabletCopyTest : public KuduTabletTest {
CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
}
- virtual void SetUp() OVERRIDE {
+ void SetUp() override {
NO_FATALS(KuduTabletTest::SetUp());
NO_FATALS(SetUpTabletReplica());
NO_FATALS(PopulateTablet());
NO_FATALS(InitSession());
}
- virtual void TearDown() OVERRIDE {
+ void TearDown() override {
session_.reset();
tablet_replica_->Shutdown();
KuduTabletTest::TearDown();
@@ -137,37 +137,37 @@ class TabletCopyTest : public KuduTabletTest {
scoped_refptr<Log> log;
ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
*tablet()->schema(),
- 0, // schema_version
- NULL, &log));
+ /*schema_version=*/ 0,
+ /*metric_entity=*/ nullptr,
+ &log));
scoped_refptr<MetricEntity> metric_entity =
METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());
- RaftPeerPB config_peer;
- config_peer.set_permanent_uuid(fs_manager()->uuid());
- config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
- config_peer.mutable_last_known_addr()->set_port(0);
- config_peer.set_member_type(RaftPeerPB::VOTER);
+ // TODO(mpercy): Similar to code in tablet_replica-test, consider refactor.
+ RaftConfigPB config;
+ config.set_opid_index(consensus::kInvalidOpIdIndex);
+ RaftPeerPB* config_peer = config.add_peers();
+ config_peer->set_permanent_uuid(fs_manager()->uuid());
+ config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
+ config_peer->mutable_last_known_addr()->set_port(0);
+ config_peer->set_member_type(RaftPeerPB::VOTER);
scoped_refptr<ConsensusMetadataManager> cmeta_manager(
new ConsensusMetadataManager(fs_manager()));
+ scoped_refptr<ConsensusMetadata> cmeta;
+ ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(),
+ config, consensus::kMinimumTerm, &cmeta));
tablet_replica_.reset(
new TabletReplica(tablet()->metadata(),
cmeta_manager,
- config_peer,
+ *config_peer,
apply_pool_.get(),
Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
Unretained(this),
tablet()->tablet_id())));
- // TODO(dralves) similar to code in tablet_replica-test, consider refactor.
- RaftConfigPB config;
- config.add_peers()->CopyFrom(config_peer);
- config.set_opid_index(consensus::kInvalidOpIdIndex);
-
- scoped_refptr<ConsensusMetadata> cmeta;
- ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(),
- config, consensus::kMinimumTerm, &cmeta));
+ ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
shared_ptr<Messenger> messenger;
MessengerBuilder mbuilder(CURRENT_TEST_NAME());
@@ -182,7 +182,6 @@ class TabletCopyTest : public KuduTabletTest {
messenger,
scoped_refptr<rpc::ResultTracker>(),
log,
- raft_pool_.get(),
prepare_pool_.get()));
ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index aad6045..d0d8442 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -162,6 +162,9 @@ using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
using kudu::server::ServerBase;
using kudu::tablet::AlterSchemaTransactionState;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tablet::Tablet;
using kudu::tablet::TabletReplica;
using kudu::tablet::TransactionCompletionCallback;
@@ -184,8 +187,8 @@ namespace tserver {
namespace {
-// Lookup the given tablet, ensuring that it both exists and is RUNNING.
-// If it is not, responds to the RPC associated with 'context' after setting
+// Lookup the given tablet, only ensuring that it exists.
+// If it does not, responds to the RPC associated with 'context' after setting
// resp->mutable_error() to indicate the failure reason.
//
// Returns true if successful.
@@ -195,24 +198,65 @@ bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
RespClass* resp,
rpc::RpcContext* context,
scoped_refptr<TabletReplica>* replica) {
- if (PREDICT_FALSE(!tablet_manager->GetTabletReplica(tablet_id, replica).ok())) {
- SetupErrorAndRespond(resp->mutable_error(),
- Status::NotFound("Tablet not found"),
+ Status s = tablet_manager->GetTabletReplica(tablet_id, replica);
+ if (PREDICT_FALSE(!s.ok())) {
+ SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::TABLET_NOT_FOUND, context);
return false;
}
+ return true;
+}
+
+template<class RespClass>
+void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
+ tablet::TabletStatePB tablet_state,
+ RespClass* resp,
+ rpc::RpcContext* context) {
+ Status s = Status::IllegalState("Tablet not RUNNING",
+ tablet::TabletStatePB_Name(tablet_state));
+ auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+ if (replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_TOMBSTONED ||
+ replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
+ // Treat tombstoned tablets as if they don't exist for most purposes.
+ // This takes precedence over failed, since we don't reset the failed
+ // status of a TabletReplica when deleting it. Only tablet copy does that.
+ error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
+ } else if (tablet_state == tablet::FAILED) {
+ s = s.CloneAndAppend(replica->error().ToString());
+ error_code = TabletServerErrorPB::TABLET_FAILED;
+ }
+ SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
+}
+// Check if the replica is running.
+template<class RespClass>
+bool CheckTabletReplicaRunningOrRespond(const scoped_refptr<TabletReplica>& replica,
+ RespClass* resp,
+ rpc::RpcContext* context) {
// Check RUNNING state.
- tablet::TabletStatePB state = (*replica)->state();
+ tablet::TabletStatePB state = replica->state();
if (PREDICT_FALSE(state != tablet::RUNNING)) {
- Status s = Status::IllegalState("Tablet not RUNNING",
- tablet::TabletStatePB_Name(state));
- TabletServerErrorPB::Code code = TabletServerErrorPB::TABLET_NOT_RUNNING;
- if (state == tablet::FAILED) {
- s = s.CloneAndAppend((*replica)->error().ToString());
- code = TabletServerErrorPB::TABLET_FAILED;
- }
- SetupErrorAndRespond(resp->mutable_error(), s, code, context);
+ RespondTabletNotRunning(replica, state, resp, context);
+ return false;
+ }
+ return true;
+}
+
+// Lookup the given tablet, ensuring that it both exists and is RUNNING.
+// If it is not, responds to the RPC associated with 'context' after setting
+// resp->mutable_error() to indicate the failure reason.
+//
+// Returns true if successful.
+template<class RespClass>
+bool LookupRunningTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
+ const string& tablet_id,
+ RespClass* resp,
+ rpc::RpcContext* context,
+ scoped_refptr<TabletReplica>* replica) {
+ if (!LookupTabletReplicaOrRespond(tablet_manager, tablet_id, resp, context, replica)) {
+ return false;
+ }
+ if (!CheckTabletReplicaRunningOrRespond(*replica, resp, context)) {
return false;
}
return true;
@@ -253,14 +297,16 @@ template<class RespClass>
bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
RespClass* resp,
rpc::RpcContext* context,
- shared_ptr<RaftConsensus>* consensus) {
- *consensus = replica->shared_consensus();
- if (!*consensus) {
- Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running");
+ shared_ptr<RaftConsensus>* consensus_out) {
+ shared_ptr<RaftConsensus> tmp_consensus = replica->shared_consensus();
+ if (!tmp_consensus) {
+ Status s = Status::ServiceUnavailable("Raft Consensus unavailable",
+ "Tablet replica not initialized");
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::TABLET_NOT_RUNNING, context);
return false;
}
+ *consensus_out = std::move(tmp_consensus);
return true;
}
@@ -620,8 +666,8 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
DVLOG(3) << "Received Alter Schema RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
+ context, &replica)) {
return;
}
@@ -786,8 +832,8 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
+ context, &replica)) {
return;
}
@@ -895,7 +941,8 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -926,15 +973,51 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) {
return;
}
+
+ // Because the last-logged opid is stored in the TabletMetadata we go through
+ // the following dance:
+ // 1. Get a reference to the currently-registered TabletReplica.
+ // 2. Fetch (non-atomically) the current data state and last-logged opid from
+ // the TabletMetadata.
+ // 3. If the data state is COPYING or TOMBSTONED, pass the last-logged opid
+ // from the TabletMetadata into RaftConsensus::RequestVote().
+ //
+ // The reason this sequence is safe to do without atomic locks is the
+ // RaftConsensus object associated with the TabletReplica will be Shutdown()
+ // and thus unable to vote if another TabletCopy operation comes between
+ // steps 1 and 3.
+ //
+ // TODO(mpercy): Move the last-logged opid into ConsensusMetadata to avoid
+ // this hacky plumbing. An additional benefit would be that we would be able
+ // to easily "tombstoned vote" while the tablet is bootstrapping.
+
scoped_refptr<TabletReplica> replica;
if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
return;
}
+ boost::optional<OpId> last_logged_opid;
+ tablet::TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
+
+ LOG(INFO) << "Received RequestConsensusVote() RPC: " << SecureShortDebugString(*req);
+
+ // We cannot vote while DELETED. This check is not racy because DELETED is a
+ // terminal state; it is not possible to transition out of DELETED.
+ if (data_state == TABLET_DATA_DELETED) {
+ RespondTabletNotRunning(replica, replica->state(), resp, context);
+ return;
+ }
+
+ // Attempt to vote while copying or tombstoned.
+ if (data_state == TABLET_DATA_COPYING || data_state == TABLET_DATA_TOMBSTONED) {
+ last_logged_opid = replica->tablet_metadata()->tombstone_last_logged_opid();
+ }
+
// Submit the vote request directly to the consensus instance.
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
- Status s = consensus->RequestVote(req, resp);
+
+ Status s = consensus->RequestVote(req, std::move(last_logged_opid), resp);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
@@ -952,8 +1035,8 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -976,8 +1059,8 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB*
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -1008,7 +1091,8 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -1034,7 +1118,8 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -1058,7 +1143,8 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
return;
}
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+ &replica)) {
return;
}
@@ -1186,8 +1272,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
if (req->has_new_scan_request()) {
const NewScanRequestPB& scan_pb = req->new_scan_request();
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp,
+ context, &replica)) {
return;
}
string scanner_id;
@@ -1297,8 +1383,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
const NewScanRequestPB& new_req = req->new_request();
scoped_refptr<TabletReplica> replica;
- if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp, context,
- &replica)) {
+ if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp,
+ context, &replica)) {
return;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3b9b20e..0a60fd8 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -218,7 +218,8 @@ Status TSTabletManager::Init() {
CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
}
- scoped_refptr<TabletReplica> replica = CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+ scoped_refptr<TabletReplica> replica;
+ RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
this, replica, deleter)));
}
@@ -297,7 +298,8 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
scoped_refptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK_PREPEND(cmeta_manager_->Create(tablet_id, config, kMinimumTerm, &cmeta),
"Unable to create new ConsensusMetadata for tablet " + tablet_id);
- scoped_refptr<TabletReplica> new_replica = CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+ scoped_refptr<TabletReplica> new_replica;
+ RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &new_replica));
// We can run this synchronously since there is nothing to bootstrap.
RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
@@ -499,12 +501,15 @@ void TSTabletManager::RunTabletCopy(
CheckLeaderTermNotLower(tablet_id, leader_term, opt_last_logged_opid->term()),
TabletServerErrorPB::INVALID_CONFIG);
- // Tombstone the tablet and store the last-logged OpId.
+ // Shut down the old TabletReplica so that it is no longer allowed to
+ // mutate the ConsensusMetadata.
old_replica->Shutdown();
+
// Note that this leaves the data dir manager without any references to
// tablet_id. This is okay because the tablet_copy_client should
// generate a new disk group during the call to Start().
- //
+
+ // Tombstone the tablet and store the last-logged OpId.
// TODO(mpercy): Because we begin shutdown of the tablet after we check our
// last-logged term against the leader's term, there may be operations
// in flight and it may be possible for the same check in the tablet
@@ -544,16 +549,21 @@ void TSTabletManager::RunTabletCopy(
}
CALLBACK_RETURN_NOT_OK(tc_client.Start(copy_source_addr, &meta));
- // From this point onward, the superblock is persisted in TABLET_DATA_COPYING
- // state, and we need to tombtone the tablet if additional steps prior to
- // getting to a TABLET_DATA_READY state fail.
+ // After calling TabletCopyClient::Start(), the superblock is persisted in
+ // TABLET_DATA_COPYING state. TabletCopyClient will automatically tombstone
+ // the tablet by implicitly calling Abort() on itself if it is destroyed
+ // prior to calling TabletCopyClient::Finish(), which if successful
+ // transitions the tablet into the TABLET_DATA_READY state.
- // Registering a non-initialized TabletReplica offers visibility through the Web UI.
+ // Registering an unstarted TabletReplica allows for tombstoned voting and
+ // offers visibility through the Web UI.
RegisterTabletReplicaMode mode = replacing_tablet ? REPLACEMENT_REPLICA : NEW_REPLICA;
- scoped_refptr<TabletReplica> replica = CreateAndRegisterTabletReplica(meta, mode);
+ scoped_refptr<TabletReplica> replica;
+ CALLBACK_RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, mode, &replica));
// Now we invoke the StartTabletCopy callback and respond success to the
- // remote caller. Then we proceed to do most of the actual tablet copying work.
+ // remote caller, since StartTabletCopy() is an asynchronous RPC call. Then
+ // we proceed with the Tablet Copy process.
cb(Status::OK(), TabletServerErrorPB::UNKNOWN_ERROR);
cb = [](const Status&, TabletServerErrorPB::Code) {
LOG(FATAL) << "Callback invoked twice from TSTabletManager::RunTabletCopy()";
@@ -561,7 +571,7 @@ void TSTabletManager::RunTabletCopy(
// From this point onward, we do not notify the caller about progress or success.
- // Download all of the remote files.
+ // Go through and synchronously download the remote blocks and WAL segments.
Status s = tc_client.FetchAll(replica);
if (!s.ok()) {
LOG(WARNING) << LogPrefix(tablet_id) << "Tablet Copy: Unable to fetch data from remote peer "
@@ -580,14 +590,15 @@ void TSTabletManager::RunTabletCopy(
return;
}
- // startup it's still in a valid, fully-copied state.
+ // Bootstrap and start the fully-copied tablet.
OpenTablet(replica, deleter);
}
// Create and register a new TabletReplica, given tablet metadata.
-scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
+Status TSTabletManager::CreateAndRegisterTabletReplica(
scoped_refptr<TabletMetadata> meta,
- RegisterTabletReplicaMode mode) {
+ RegisterTabletReplicaMode mode,
+ scoped_refptr<TabletReplica>* replica_out) {
const string& tablet_id = meta->tablet_id();
scoped_refptr<TabletReplica> replica(
new TabletReplica(std::move(meta),
@@ -597,8 +608,14 @@ scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
Bind(&TSTabletManager::MarkTabletDirty,
Unretained(this),
tablet_id)));
+ Status s = replica->Init(server_->raft_pool());
+ if (PREDICT_FALSE(!s.ok())) {
+ replica->SetError(s);
+ replica->Shutdown();
+ }
RegisterTablet(tablet_id, replica, mode);
- return replica;
+ *replica_out = std::move(replica);
+ return Status::OK();
}
Status TSTabletManager::DeleteTablet(
@@ -641,8 +658,7 @@ Status TSTabletManager::DeleteTablet(
// consensus and therefore the log is not available.
TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
bool tablet_deleted = (data_state == TABLET_DATA_DELETED ||
- data_state == TABLET_DATA_TOMBSTONED ||
- replica->state() == tablet::FAILED);
+ data_state == TABLET_DATA_TOMBSTONED);
// They specified an "atomic" delete. Check the committed config's opid_index.
// TODO(mpercy): There's actually a race here between the check and shutdown,
@@ -665,7 +681,7 @@ Status TSTabletManager::DeleteTablet(
}
}
- replica->Shutdown();
+ replica->Stop();
boost::optional<OpId> opt_last_logged_opid;
if (consensus) {
@@ -825,7 +841,6 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
server_->messenger(),
server_->result_tracker(),
log,
- server_->raft_pool(),
server_->tablet_prepare_pool());
if (!s.ok()) {
LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to start: "
@@ -940,12 +955,6 @@ Status TSTabletManager::GetTabletReplica(const string& tablet_id,
if (!LookupTablet(tablet_id, replica)) {
return Status::NotFound("Tablet not found", tablet_id);
}
- TabletDataState data_state = (*replica)->tablet_metadata()->tablet_data_state();
- if (data_state != TABLET_DATA_READY) {
- return Status::IllegalState("Tablet data state not TABLET_DATA_READY: " +
- TabletDataState_Name(data_state),
- tablet_id);
- }
return Status::OK();
}
@@ -1062,7 +1071,8 @@ Status TSTabletManager::HandleNonReadyTabletOnStartup(const scoped_refptr<Tablet
// allows us to permanently delete replica tombstones when a table gets
// deleted.
if (data_state == TABLET_DATA_TOMBSTONED) {
- CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+ scoped_refptr<TabletReplica> dummy;
+ RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &dummy));
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index b0420da..7257d84 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -260,9 +260,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Calls RegisterTablet() with the given 'mode' parameter after constructing
// the TablerPeer object. See RegisterTablet() for details about the
// semantics of 'mode' and the locking requirements.
- scoped_refptr<tablet::TabletReplica> CreateAndRegisterTabletReplica(
- scoped_refptr<tablet::TabletMetadata> meta,
- RegisterTabletReplicaMode mode);
+ Status CreateAndRegisterTabletReplica(scoped_refptr<tablet::TabletMetadata> meta,
+ RegisterTabletReplicaMode mode,
+ scoped_refptr<tablet::TabletReplica>* replica_out);
// Helper to generate the report for a single tablet.
void CreateReportedTabletPB(const std::string& tablet_id,
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 8578939..c348341 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -490,7 +490,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
if (!LoadTablet(tserver_, req, &id, &replica, output)) return;
shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
if (!consensus) {
- *output << "Tablet " << EscapeForHtmlToString(id) << " not running";
+ *output << "Tablet " << EscapeForHtmlToString(id) << " not initialized";
return;
}
consensus->DumpStatusHtml(*output);
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/util/make_shared.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/make_shared.h b/src/kudu/util/make_shared.h
index c534ebf..ddba52c 100644
--- a/src/kudu/util/make_shared.h
+++ b/src/kudu/util/make_shared.h
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_UTIL_MAKE_SHARED_H_
-#define KUDU_UTIL_MAKE_SHARED_H_
+#pragma once
#ifdef __GLIBCXX__
#include <ext/alloc_traits.h> // IWYU pragma: export
@@ -64,5 +63,3 @@
#else
#error "Need to implement ALLOW_MAKE_SHARED for your platform!"
#endif
-
-#endif // KUDU_UTIL_MAKE_SHARED_H_
[2/2] kudu git commit: KUDU-871. Support tombstoned voting
Posted by to...@apache.org.
KUDU-871. Support tombstoned voting
This patch makes it possible for tombstoned tablet replicas to vote in
Raft elections.
Changes:
* Add Stop() method to TabletReplica + Consensus lifecycle.
* Includes new STOPPED state.
* Tombstoning a replica should call Stop().
* Deleting a replica should call Shutdown().
* Persist ConsensusMetadata before returning from
TabletCopyClient::Start() because we need cmeta to Init()
RaftConsensus, which happens when registering the replica in
TSTabletManager.
* TSTabletManager::DeleteTablet() should not consider FAILED == deleted,
since we no longer destroy RaftConsensus when tombstoning a replica.
* Add positive and negative tests for tombstoned voting.
* Add a stress test that induces lots of tombstoned voting
while running TabletCopy, TabletBootstrap, and DeleteTablet.
* Fix DeleteTableITest.TestMergeConsensusMetadata after tombstoned
voting changed its assumption that tombstoned tablets would not vote.
* Fix several tests that expected tombstoned tablets to be SHUTDOWN when
now they are STOPPED.
Change-Id: Ia19d75b185299443b27f41e468bbae20065e7570
Reviewed-on: http://gerrit.cloudera.org:8080/6960
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5bca7d8b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5bca7d8b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5bca7d8b
Branch: refs/heads/master
Commit: 5bca7d8ba185d62952fb3e3163cbe88d20453da0
Parents: b37bde7
Author: Mike Percy <mp...@apache.org>
Authored: Wed Aug 23 00:01:04 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Aug 24 04:43:25 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus-test-util.h | 2 +-
src/kudu/consensus/raft_consensus.cc | 244 +++++++---
src/kudu/consensus/raft_consensus.h | 88 +++-
.../consensus/raft_consensus_quorum-test.cc | 29 +-
src/kudu/integration-tests/CMakeLists.txt | 2 +
.../integration-tests/cluster_itest_util.cc | 29 ++
src/kudu/integration-tests/cluster_itest_util.h | 12 +
.../integration-tests/delete_table-itest.cc | 19 +-
.../external_mini_cluster-itest-base.cc | 6 +-
.../tombstoned_voting-itest.cc | 461 +++++++++++++++++++
.../tombstoned_voting-stress-test.cc | 313 +++++++++++++
src/kudu/master/sys_catalog.cc | 7 +-
src/kudu/tablet/metadata.proto | 31 +-
src/kudu/tablet/tablet_metadata.h | 7 +-
src/kudu/tablet/tablet_replica-test.cc | 30 +-
src/kudu/tablet/tablet_replica.cc | 127 +++--
src/kudu/tablet/tablet_replica.h | 45 +-
src/kudu/tools/kudu-admin-test.cc | 4 +-
src/kudu/tools/kudu-ts-cli-test.cc | 2 +-
src/kudu/tserver/tablet_copy_client.cc | 6 +-
src/kudu/tserver/tablet_copy_client.h | 3 +-
.../tserver/tablet_copy_source_session-test.cc | 37 +-
src/kudu/tserver/tablet_service.cc | 156 +++++--
src/kudu/tserver/ts_tablet_manager.cc | 62 +--
src/kudu/tserver/ts_tablet_manager.h | 6 +-
src/kudu/tserver/tserver-path-handlers.cc | 2 +-
src/kudu/util/make_shared.h | 5 +-
27 files changed, 1439 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index ac4408b..c73e50b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -534,7 +534,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
if (s.ok()) {
- s = peer->RequestVote(&other_peer_req, &other_peer_resp);
+ s = peer->RequestVote(&other_peer_req, boost::none, &other_peer_resp);
}
if (!s.ok()) {
LOG(WARNING) << "Could not RequestVote from replica with request: "
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 042c5d1..11d1e40 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -118,6 +118,11 @@ DEFINE_bool(raft_enable_pre_election, true,
TAG_FLAG(raft_enable_pre_election, experimental);
TAG_FLAG(raft_enable_pre_election, runtime);
+DEFINE_bool(raft_enable_tombstoned_voting, true,
+ "When enabled, tombstoned tablets may vote in elections.");
+TAG_FLAG(raft_enable_tombstoned_voting, experimental);
+TAG_FLAG(raft_enable_tombstoned_voting, runtime);
+
DECLARE_int32(memory_limit_warn_threshold_percentage);
// Metrics
@@ -133,9 +138,11 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
"Current Term of the Raft Consensus algorithm. This number increments "
"each time a leader election is started.");
+using boost::optional;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::PeriodicTimer;
using kudu::tserver::TabletServerErrorPB;
+using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::weak_ptr;
@@ -165,10 +172,8 @@ RaftConsensus::RaftConsensus(
Status RaftConsensus::Init() {
DCHECK_EQ(kNew, state_) << State_Name(state_);
-
RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
-
- state_ = kInitialized;
+ SetStateUnlocked(kInitialized);
return Status::OK();
}
@@ -176,6 +181,20 @@ RaftConsensus::~RaftConsensus() {
Shutdown();
}
+Status RaftConsensus::Create(ConsensusOptions options,
+ RaftPeerPB local_peer_pb,
+ scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+ ThreadPool* raft_pool,
+ shared_ptr<RaftConsensus>* consensus_out) {
+ shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
+ std::move(local_peer_pb),
+ std::move(cmeta_manager),
+ raft_pool));
+ RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
+ *consensus_out = std::move(consensus);
+ return Status::OK();
+}
+
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
@@ -257,13 +276,13 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
// Our last persisted term can be higher than the last persisted operation
// (i.e. if we called an election) but reverse should never happen.
- if (info.last_id.term() > GetCurrentTermUnlocked()) {
+ if (info.last_id.term() > CurrentTermUnlocked()) {
return Status::Corruption(Substitute("Unable to start RaftConsensus: "
"The last op in the WAL with id $0 has a term ($1) that is greater "
"than the latest recorded term, which is $2",
OpIdToString(info.last_id),
info.last_id.term(),
- GetCurrentTermUnlocked()));
+ CurrentTermUnlocked()));
}
// Append any uncommitted replicate messages found during log replay to the queue.
@@ -283,7 +302,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
// If this is the first term expire the FD immediately so that we have a
// fast first election, otherwise we just let the timer expire normally.
boost::optional<MonoDelta> initial_delta;
- if (GetCurrentTermUnlocked() == 0) {
+ if (CurrentTermUnlocked() == 0) {
// The failure detector is initialized to a low value to trigger an early
// election (unless someone else requested a vote from us first, which
// resets the election timer).
@@ -304,7 +323,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
// Now assume "follower" duties.
RETURN_NOT_OK(BecomeReplicaUnlocked());
- state_ = kRunning;
+ SetStateUnlocked(kRunning);
}
if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
@@ -336,7 +355,7 @@ Status RaftConsensus::EmulateElection() {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
// Assume leadership of new term.
- RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
+ RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1));
SetLeaderUuidUnlocked(peer_uuid());
return BecomeLeaderUnlocked();
}
@@ -408,7 +427,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
// We skip flushing the term to disk because setting the vote just below also
// flushes to disk, and the double fsync doesn't buy us anything.
- RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
+ RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
SKIP_FLUSH_TO_DISK));
RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
}
@@ -427,7 +446,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
RETURN_NOT_OK(counter->RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
CHECK(!duplicate) << LogPrefixUnlocked()
<< "Inexplicable duplicate self-vote for term "
- << GetCurrentTermUnlocked();
+ << CurrentTermUnlocked();
VoteRequestPB request;
request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
@@ -436,9 +455,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
// In a pre-election, we haven't bumped our own term yet, so we need to be
// asking for votes for the next term.
request.set_is_pre_election(true);
- request.set_candidate_term(GetCurrentTermUnlocked() + 1);
+ request.set_candidate_term(CurrentTermUnlocked() + 1);
} else {
- request.set_candidate_term(GetCurrentTermUnlocked());
+ request.set_candidate_term(CurrentTermUnlocked());
}
request.set_tablet_id(options_.tablet_id);
*request.mutable_candidate_status()->mutable_last_received() =
@@ -581,7 +600,7 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
- RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
+ RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked()));
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
}
@@ -593,7 +612,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
- round->BindToTerm(GetCurrentTermUnlocked());
+ round->BindToTerm(CurrentTermUnlocked());
return Status::OK();
}
@@ -669,7 +688,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
// We will process commit notifications while shutting down because a replica
// which has initiated a Prepare() / Replicate() may eventually commit even if
// its state has changed after the initial Append() / Update().
- if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+ if (PREDICT_FALSE(state_ != kRunning && state_ != kStopping)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
<< "Replica not in running state: "
<< State_Name(state_);
@@ -716,7 +735,7 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- int64_t current_term = GetCurrentTermUnlocked();
+ int64_t current_term = CurrentTermUnlocked();
if (current_term != term) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
<< "previous term " << term << ", but a leader election "
@@ -899,16 +918,16 @@ Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB*
ConsensusResponsePB* response) {
DCHECK(lock_.is_locked());
// Do term checks first:
- if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
+ if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) {
// If less, reject.
- if (request->caller_term() < GetCurrentTermUnlocked()) {
+ if (request->caller_term() < CurrentTermUnlocked()) {
string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
"Current term is $2. Ops: $3",
request->caller_uuid(),
request->caller_term(),
- GetCurrentTermUnlocked(),
+ CurrentTermUnlocked(),
OpsRangeString(*request));
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
FillConsensusResponseError(response,
@@ -1371,7 +1390,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
DCHECK(lock_.is_locked());
TRACE("Filling consensus response to leader.");
- response->set_responder_term(GetCurrentTermUnlocked());
+ response->set_responder_term(CurrentTermUnlocked());
response->mutable_status()->mutable_last_received()->CopyFrom(
queue_->GetLastOpIdInLog());
response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
@@ -1388,7 +1407,9 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
StatusToPB(status, error->mutable_status());
}
-Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
+Status RaftConsensus::RequestVote(const VoteRequestPB* request,
+ optional<OpId> tombstone_last_logged_opid,
+ VoteResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
@@ -1414,14 +1435,41 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// We still need to take the state lock in order to respond with term info, etc.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- RETURN_NOT_OK(CheckRunningUnlocked());
return RequestVoteRespondIsBusy(request, response);
}
// Acquire the replica state lock so we can read / modify the consensus state.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- RETURN_NOT_OK(CheckRunningUnlocked());
+
+ // Ensure our lifecycle state is compatible with voting.
+ // If RaftConsensus is running, we use the latest OpId from the WAL to vote.
+ // Otherwise, we must be voting while tombstoned.
+ OpId local_last_logged_opid;
+ switch (state_) {
+ case kShutdown:
+ return Status::IllegalState("cannot vote while shut down");
+ case kRunning:
+ // Note: it is (theoretically) possible for 'tombstone_last_logged_opid'
+ // to be passed in and by the time we reach here the state is kRunning.
+ // That may occur when a vote request comes in at the end of a tablet
+ // copy and then tablet bootstrap completes quickly. In that case, we
+ // ignore the passed-in value and use the latest OpId from our queue.
+ local_last_logged_opid = queue_->GetLastOpIdInLog();
+ break;
+ default:
+ if (!tombstone_last_logged_opid) {
+ return Status::IllegalState("must be running to vote when last-logged opid is not known");
+ }
+ if (!FLAGS_raft_enable_tombstoned_voting) {
+ return Status::IllegalState("must be running to vote when tombstoned voting is disabled");
+ }
+ local_last_logged_opid = *tombstone_last_logged_opid;
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid "
+ << local_last_logged_opid;
+ break;
+ }
+ DCHECK(local_last_logged_opid.IsInitialized());
// If the node is not in the configuration, allow the vote (this is required by Raft)
// but log an informational message anyway.
@@ -1452,12 +1500,12 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
}
// Candidate is running behind.
- if (request->candidate_term() < GetCurrentTermUnlocked()) {
+ if (request->candidate_term() < CurrentTermUnlocked()) {
return RequestVoteRespondInvalidTerm(request, response);
}
// We already voted this term.
- if (request->candidate_term() == GetCurrentTermUnlocked() &&
+ if (request->candidate_term() == CurrentTermUnlocked() &&
HasVotedCurrentTermUnlocked()) {
// Already voted for the same candidate in the current term.
@@ -1471,7 +1519,6 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// Candidate must have last-logged OpId at least as large as our own to get
// our vote.
- OpId local_last_logged_opid = queue_->GetLastOpIdInLog();
bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
local_last_logged_opid);
@@ -1480,14 +1527,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// has actually now successfully become leader of the prior term, in which case
// bumping our term here would disrupt it.
if (!request->is_pre_election() &&
- request->candidate_term() > GetCurrentTermUnlocked()) {
+ request->candidate_term() > CurrentTermUnlocked()) {
// If we are going to vote for this peer, then we will flush the consensus metadata
// to disk below when we record the vote, and we can skip flushing the term advancement
// to disk here.
auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
- GetCurrentTermUnlocked(), request->candidate_term()));
+ CurrentTermUnlocked(), request->candidate_term()));
}
if (!vote_yes) {
@@ -1629,7 +1676,7 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
// we can stick them in the consensus update request later.
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- current_term = GetCurrentTermUnlocked();
+ current_term = CurrentTermUnlocked();
committed_config = cmeta_->CommittedConfig();
if (cmeta_->has_pending_config()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING)
@@ -1746,23 +1793,17 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
return s;
}
-void RaftConsensus::Shutdown() {
+void RaftConsensus::Stop() {
TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
- // Avoid taking locks if already shut down so we don't violate
- // ThreadRestrictions assertions in the case where the RaftConsensus
- // destructor runs on the reactor thread due to an election callback being
- // the last outstanding reference.
- if (shutdown_.Load(kMemOrderAcquire)) return;
-
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- // Transition to kShuttingDown state.
- CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
- state_ = kShuttingDown;
+ if (state_ == kStopping || state_ == kStopped || state_ == kShutdown) return;
+ // Transition to kStopping state.
+ SetStateUnlocked(kStopping);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
}
@@ -1776,15 +1817,40 @@ void RaftConsensus::Shutdown() {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
if (pending_) CHECK_OK(pending_->CancelPendingTransactions());
- CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
- state_ = kShutDown;
+ SetStateUnlocked(kStopped);
+
+ // Clear leader status on Stop(), in case this replica was the leader. If
+ // we don't do this, the log messages still show this node as the leader.
+ // No need to sync it since it's not persistent state.
+ if (cmeta_) {
+ ClearLeaderUnlocked();
+ }
+
+ // If we were the leader, stop witholding votes.
+ if (withhold_votes_until_ == MonoTime::Max()) {
+ withhold_votes_until_ = MonoTime::Min();
+ }
+
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
}
// Shut down things that might acquire locks during destruction.
if (raft_pool_token_) raft_pool_token_->Shutdown();
if (failure_detector_) DisableFailureDetector();
+}
+
+void RaftConsensus::Shutdown() {
+ // Avoid taking locks if already shut down so we don't violate
+ // ThreadRestrictions assertions in the case where the RaftConsensus
+ // destructor runs on the reactor thread due to an election callback being
+ // the last outstanding reference.
+ if (shutdown_.Load(kMemOrderAcquire)) return;
+ Stop();
+ {
+ LockGuard l(lock_);
+ SetStateUnlocked(kShutdown);
+ }
shutdown_.Store(true, kMemOrderRelease);
}
@@ -1826,13 +1892,13 @@ std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB&
}
void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
- response->set_responder_term(GetCurrentTermUnlocked());
+ response->set_responder_term(CurrentTermUnlocked());
response->set_vote_granted(true);
}
void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
VoteResponsePB* response) {
- response->set_responder_term(GetCurrentTermUnlocked());
+ response->set_responder_term(CurrentTermUnlocked());
response->set_vote_granted(false);
response->mutable_consensus_error()->set_code(error_code);
}
@@ -1845,7 +1911,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
request->candidate_term(),
- GetCurrentTermUnlocked());
+ CurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
@@ -1869,7 +1935,7 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
"Already voted for candidate $3 in this term.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
- GetCurrentTermUnlocked(),
+ CurrentTermUnlocked(),
GetVotedForCurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
@@ -1944,7 +2010,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
- GetCurrentTermUnlocked());
+ CurrentTermUnlocked());
return Status::OK();
}
@@ -1954,6 +2020,35 @@ RaftPeerPB::Role RaftConsensus::role() const {
return cmeta_->active_role();
}
+int64_t RaftConsensus::CurrentTerm() const {
+ LockGuard l(lock_);
+ return CurrentTermUnlocked();
+}
+
+void RaftConsensus::SetStateUnlocked(State new_state) {
+ switch (new_state) {
+ case kInitialized:
+ CHECK_EQ(kNew, state_);
+ break;
+ case kRunning:
+ CHECK_EQ(kInitialized, state_);
+ break;
+ case kStopping:
+ CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_);
+ break;
+ case kStopped:
+ CHECK_EQ(kStopping, state_);
+ break;
+ case kShutdown:
+ CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_);
+ break;
+ default:
+ LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state);
+ break;
+ }
+ state_ = new_state;
+}
+
const char* RaftConsensus::State_Name(State state) {
switch (state) {
case kNew:
@@ -1962,9 +2057,11 @@ const char* RaftConsensus::State_Name(State state) {
return "Initialized";
case kRunning:
return "Running";
- case kShuttingDown:
- return "Shutting down";
- case kShutDown:
+ case kStopping:
+ return "Stopping";
+ case kStopped:
+ return "Stopped";
+ case kShutdown:
return "Shut down";
default:
LOG(DFATAL) << "Unknown State value: " << state;
@@ -2018,7 +2115,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
// TODO(todd): should use queue committed index here? in that case do
// we need to pass it in at all?
queue_->SetLeaderMode(pending_->GetCommittedIndex(),
- GetCurrentTermUnlocked(),
+ CurrentTermUnlocked(),
active_config);
RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
return Status::OK();
@@ -2045,6 +2142,16 @@ RaftConfigPB RaftConsensus::CommittedConfig() const {
}
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
+ RaftPeerPB::Role role;
+ {
+ LockGuard l(lock_);
+ if (state_ != kRunning) {
+ out << "Tablet " << EscapeForHtmlToString(tablet_id()) << " not running" << std::endl;
+ return;
+ }
+ role = cmeta_->active_role();
+ }
+
out << "<h1>Raft Consensus State</h1>" << std::endl;
out << "<h2>State</h2>" << std::endl;
@@ -2053,12 +2160,6 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
// Dump the queues on a leader.
- RaftPeerPB::Role role;
- {
- ThreadRestrictions::AssertWaitAllowed();
- LockGuard l(lock_);
- role = cmeta_->active_role();
- }
if (role == RaftPeerPB::LEADER) {
out << "<h2>Queue overview</h2>" << std::endl;
out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
@@ -2108,7 +2209,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
// because it already voted in term 2. The check below ensures that peer B
// will bump to term 2 when it gets the vote rejection, such that its
// next pre-election (for term 3) would succeed.
- if (result.highest_voter_term > GetCurrentTermUnlocked()) {
+ if (result.highest_voter_term > CurrentTermUnlocked()) {
HandleTermAdvanceUnlocked(result.highest_voter_term);
}
@@ -2138,7 +2239,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
election_started_in_term--;
}
- if (election_started_in_term != GetCurrentTermUnlocked()) {
+ if (election_started_in_term != CurrentTermUnlocked()) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Leader " << election_type << " decision vote started in "
<< "defunct term " << election_started_in_term << ": "
@@ -2350,7 +2451,7 @@ void RaftConsensus::DisableFailureDetector() {
void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging,
boost::optional<MonoDelta> delta) {
- if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+ if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) {
if (allow_logging == ALLOW_LOGGING) {
LOG(INFO) << LogPrefixThreadSafe()
<< Substitute("Snoozing failure detection for $0",
@@ -2393,13 +2494,13 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
FlushToDisk flush) {
DCHECK(lock_.is_locked());
- if (new_term <= GetCurrentTermUnlocked()) {
+ if (new_term <= CurrentTermUnlocked()) {
return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
- new_term, GetCurrentTermUnlocked()));
+ new_term, CurrentTermUnlocked()));
}
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
- << GetCurrentTermUnlocked();
+ << CurrentTermUnlocked();
RETURN_NOT_OK(BecomeReplicaUnlocked());
}
@@ -2508,10 +2609,10 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
"term", new_term);
DCHECK(lock_.is_locked());
- if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
+ if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) {
return Status::IllegalState(
Substitute("Cannot change term to a term that is lower than or equal to the current one. "
- "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
+ "Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term));
}
cmeta_->set_current_term(new_term);
cmeta_->clear_voted_for();
@@ -2522,7 +2623,7 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
return Status::OK();
}
-const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
+const int64_t RaftConsensus::CurrentTermUnlocked() const {
DCHECK(lock_.is_locked());
return cmeta_->current_term();
}
@@ -2574,11 +2675,14 @@ string RaftConsensus::LogPrefix() const {
string RaftConsensus::LogPrefixUnlocked() const {
DCHECK(lock_.is_locked());
- return Substitute("T $0 P $1 [term $2 $3]: ",
- options_.tablet_id,
- peer_uuid(),
- GetCurrentTermUnlocked(),
- RaftPeerPB::Role_Name(cmeta_->active_role()));
+ // 'cmeta_' may not be set if initialization failed.
+ string cmeta_info;
+ if (cmeta_) {
+ cmeta_info = Substitute(" [term $0 $1]",
+ cmeta_->current_term(),
+ RaftPeerPB::Role_Name(cmeta_->active_role()));
+ }
+ return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info);
}
string RaftConsensus::LogPrefixThreadSafe() const {
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index efcc06e..ee61fff 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -44,6 +44,7 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
@@ -126,16 +127,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
EXTERNAL_REQUEST
};
- RaftConsensus(ConsensusOptions options,
- RaftPeerPB local_peer_pb,
- scoped_refptr<ConsensusMetadataManager> cmeta_manager,
- ThreadPool* raft_pool);
~RaftConsensus();
- // Initializes the RaftConsensus object. This should be called before
- // publishing this object to any thread other than the thread that invoked
- // the constructor.
- Status Init();
+ // Factory method to construct and initialize a RaftConsensus instance.
+ static Status Create(ConsensusOptions options,
+ RaftPeerPB local_peer_pb,
+ scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+ ThreadPool* raft_pool,
+ std::shared_ptr<RaftConsensus>* consensus_out);
// Starts running the Raft consensus algorithm.
// Start() is not thread-safe. Calls to Start() should be externally
@@ -245,7 +244,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Messages sent from CANDIDATEs to voting peers to request their vote
// in leader election.
+ //
+ // If 'tombstone_last_logged_opid' is set, this replica will attempt to vote
+ // in kInitialized and kStopped states, instead of just in the kRunning
+ // state.
Status RequestVote(const VoteRequestPB* request,
+ boost::optional<OpId> tombstone_last_logged_opid,
VoteResponsePB* response);
// Implement a ChangeConfig() request.
@@ -265,6 +269,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Returns the current Raft role of this instance.
RaftPeerPB::Role role() const;
+ // Returns the current term.
+ int64_t CurrentTerm() const;
+
// Returns the uuid of this peer.
// Thread-safe.
const std::string& peer_uuid() const;
@@ -283,7 +290,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
void DumpStatusHtml(std::ostream& out) const;
- // Stop running the Raft consensus algorithm.
+ // Transition to kStopped state. See State enum definition for details.
+ // This is a no-op if the tablet is already in kStopped or kShutdown state;
+ // otherwise, Raft will pass through the kStopping state on the way to
+ // kStopped.
+ void Stop();
+
+ // Transition to kShutdown state. See State enum definition for details.
+ // It is legal to call this method while in any lifecycle state.
void Shutdown();
// Makes this peer advance it's term (and step down if leader), for tests.
@@ -317,7 +331,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
log::RetentionIndexes GetRetentionIndexes();
private:
- friend class RefCountedThreadSafe<RaftConsensus>;
+ ALLOW_MAKE_SHARED(RaftConsensus);
friend class RaftConsensusQuorumTest;
FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
@@ -325,26 +339,40 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
+ // RaftConsensus lifecycle states.
+ //
+ // Legal state transitions:
+ //
+ // kNew -> kInitialized -+-> kRunning -> kStopping -> kStopped -> kShutdown
+ // `----------------^
+ //
// NOTE: When adding / changing values in this enum, add the corresponding
- // values to State_Name().
+ // values to State_Name() as well.
+ //
enum State {
- // RaftConsensus has been freshly constructed.
+ // The RaftConsensus object has been freshly constructed and is not yet
+ // initialized. A RaftConsensus object will never be made externally
+ // visible in this state.
kNew,
- // RaftConsensus has been initialized.
+ // Raft has been initialized. It cannot accept writes, but it may be able
+ // to vote. See RequestVote() for details.
kInitialized,
- // State signaling the replica accepts requests (from clients
- // if leader, from leader if follower)
+ // Raft is running normally and will accept write requests and vote
+ // requests.
kRunning,
- // State signaling that the replica is shutting down and no longer accepting
- // new transactions or commits.
- kShuttingDown,
+ // Raft is in the process of stopping and will not accept writes. Voting
+ // may still be allowed. See RequestVote() for details.
+ kStopping,
+
+ // Raft is stopped and no longer accepting writes. However, voting may
+ // still be allowed; See RequestVote() for details.
+ kStopped,
- // State signaling the replica is shut down and does not accept
- // any more requests.
- kShutDown,
+ // Raft is fully shut down and cannot accept writes or vote requests.
+ kShutdown,
};
// Control whether printing of log messages should be done for a particular
@@ -376,6 +404,19 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
using LockGuard = std::lock_guard<simple_spinlock>;
using UniqueLock = std::unique_lock<simple_spinlock>;
+ RaftConsensus(ConsensusOptions options,
+ RaftPeerPB local_peer_pb,
+ scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+ ThreadPool* raft_pool);
+
+ // Initializes the RaftConsensus object, including loading the consensus
+ // metadata.
+ Status Init();
+
+ // Change the lifecycle state of RaftConsensus. The definition of the State
+ // enum documents legal state transitions.
+ void SetStateUnlocked(State new_state);
+
// Returns string description for State enum value.
static const char* State_Name(State state);
@@ -658,7 +699,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
FlushToDisk flush) WARN_UNUSED_RESULT;
// Returns the term set in the last config change round.
- const int64_t GetCurrentTermUnlocked() const;
+ const int64_t CurrentTermUnlocked() const;
// Accessors for the leader of the current term.
std::string GetLeaderUuidUnlocked() const;
@@ -761,6 +802,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
Callback<void(const std::string& reason)> mark_dirty_clbk_;
+ // A flag to help us avoid taking a lock on the reactor thread if the object
+ // is already in kShutdown state.
+ // TODO(mpercy): Try to get rid of this extra flag.
AtomicBool shutdown_;
// The number of times Update() has been called, used for some test assertions.
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 45d5cec..a6ecf6f 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -194,13 +194,12 @@ class RaftConsensusQuorumTest : public KuduTest {
RaftPeerPB local_peer_pb;
RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
- shared_ptr<RaftConsensus> peer(
- new RaftConsensus(options_,
- config_.peers(i),
- cmeta_managers_[i],
- raft_pool_.get()));
- RETURN_NOT_OK(peer->Init());
-
+ shared_ptr<RaftConsensus> peer;
+ RETURN_NOT_OK(RaftConsensus::Create(options_,
+ config_.peers(i),
+ cmeta_managers_[i],
+ raft_pool_.get(),
+ &peer));
peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
}
return Status::OK();
@@ -1062,7 +1061,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
VoteResponsePB response;
request.set_candidate_uuid(fs_managers_[0]->uuid());
request.set_candidate_term(last_op_id.term() + 1);
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
ASSERT_EQ(0, flush_count() - flush_count_before)
@@ -1074,7 +1073,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
// This will allow the rest of the requests in the test to go through.
flush_count_before = flush_count();
request.set_ignore_live_leader(true);
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
@@ -1085,7 +1084,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
// Ensure we get same response for same term and same UUID.
response.Clear();
flush_count_before = flush_count();
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(0, flush_count() - flush_count_before)
<< "Confirming a previous vote should not flush";
@@ -1094,7 +1093,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
flush_count_before = flush_count();
response.Clear();
request.set_candidate_uuid(fs_managers_[2]->uuid());
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
@@ -1114,7 +1113,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
request.set_candidate_uuid(fs_managers_[0]->uuid());
request.set_candidate_term(last_op_id.term() + 2);
response.Clear();
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
@@ -1128,7 +1127,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
flush_count_before = flush_count();
request.set_candidate_term(last_op_id.term() + 1);
response.Clear();
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
@@ -1144,7 +1143,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
request.set_candidate_term(last_op_id.term() + 3);
request.set_is_pre_election(true);
response.Clear();
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_TRUE(response.vote_granted());
ASSERT_FALSE(response.has_consensus_error());
ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
@@ -1163,7 +1162,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
request.set_candidate_term(last_op_id.term() + 3);
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
response.Clear();
- ASSERT_OK(peer->RequestVote(&request, &response));
+ ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
ASSERT_FALSE(response.vote_granted());
ASSERT_TRUE(response.has_consensus_error());
ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d13d396..4920e88 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -95,6 +95,8 @@ ADD_KUDU_TEST(tablet_copy-itest)
ADD_KUDU_TEST(tablet_copy_client_session-itest)
ADD_KUDU_TEST(tablet_history_gc-itest)
ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(tombstoned_voting-itest)
+ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
ADD_KUDU_TEST(ts_recovery-itest)
ADD_KUDU_TEST(ts_tablet_manager-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 051c8b5..7d6f55a 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -73,6 +73,8 @@ using consensus::OpIdType;
using consensus::RaftPeerPB;
using consensus::RunLeaderElectionResponsePB;
using consensus::RunLeaderElectionRequestPB;
+using consensus::VoteRequestPB;
+using consensus::VoteResponsePB;
using consensus::kInvalidOpIdIndex;
using master::ListTabletServersResponsePB;
using master::ListTabletServersResponsePB_Entry;
@@ -601,6 +603,33 @@ Status StartElection(const TServerDetails* replica,
return Status::OK();
}
+Status RequestVote(const TServerDetails* replica,
+ const std::string& tablet_id,
+ const std::string& candidate_uuid,
+ int64_t candidate_term,
+ const consensus::OpId& last_logged_opid,
+ boost::optional<bool> ignore_live_leader,
+ boost::optional<bool> is_pre_election,
+ const MonoDelta& timeout) {
+ DCHECK(last_logged_opid.IsInitialized());
+ VoteRequestPB req;
+ req.set_dest_uuid(replica->uuid());
+ req.set_tablet_id(tablet_id);
+ req.set_candidate_uuid(candidate_uuid);
+ req.set_candidate_term(candidate_term);
+ *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid;
+ if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader);
+ if (is_pre_election) req.set_is_pre_election(*is_pre_election);
+ VoteResponsePB resp;
+ RpcController rpc;
+ rpc.set_timeout(timeout);
+ RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc));
+ if (resp.has_vote_granted() && resp.vote_granted()) return Status::OK();
+ if (resp.has_error()) return StatusFromPB(resp.error().status());
+ if (resp.has_consensus_error()) return StatusFromPB(resp.consensus_error().status());
+ return Status::IllegalState("Unknown error");
+}
+
Status LeaderStepDown(const TServerDetails* replica,
const string& tablet_id,
const MonoDelta& timeout,
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index e8e49b5..49bf0ff 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -233,6 +233,18 @@ Status StartElection(const TServerDetails* replica,
const std::string& tablet_id,
const MonoDelta& timeout);
+// Request the given replica to vote. This is thin wrapper around
+// RequestConsensusVote(). See the definition of VoteRequestPB in
+// consensus.proto for parameter details.
+Status RequestVote(const TServerDetails* replica,
+ const std::string& tablet_id,
+ const std::string& candidate_uuid,
+ int64_t candidate_term,
+ const consensus::OpId& last_logged_opid,
+ boost::optional<bool> ignore_live_leader,
+ boost::optional<bool> is_pre_election,
+ const MonoDelta& timeout);
+
// Cause a leader to step down on the specified server.
// 'timeout' refers to the RPC timeout waiting synchronously for stepdown to
// complete on the leader side. Since that does not require communication with
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 5fdcc9d..105fd77 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -817,14 +817,22 @@ TEST_F(DeleteTableITest, TestMergeConsensusMetadata) {
TABLET_DATA_TOMBSTONED, boost::none, timeout));
NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
+ // Shut down the tablet server so it won't vote while tombstoned.
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+
ASSERT_OK(cluster_->tablet_server(1)->Restart());
ASSERT_OK(cluster_->tablet_server(2)->Restart());
NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+ ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout));
+
+ // Now restart the replica. It will get tablet copied by the leader.
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
- // The election history should have been wiped out.
+ // The election history should have been wiped out for the new term, since
+ // this node did not participate.
ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
ASSERT_EQ(3, cmeta_pb.current_term());
ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb);
@@ -1066,7 +1074,8 @@ TEST_F(DeleteTableITest, TestWebPageForTombstonedTablet) {
cluster_->tablet_server(0)->bound_http_hostport().ToString(),
page,
tablet_id), &buf));
- ASSERT_STR_CONTAINS(buf.ToString(), tablet_id);
+ ASSERT_STR_CONTAINS(buf.ToString(), tablet_id)
+ << "Page: " << page << "; tablet_id: " << tablet_id;
}
}
@@ -1292,7 +1301,7 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
if (t.tablet_status().tablet_id() == tablet_id) {
- ASSERT_EQ(tablet::SHUTDOWN, t.tablet_status().state());
+ ASSERT_EQ(tablet::STOPPED, t.tablet_status().state());
ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
<< t.tablet_status().tablet_id() << " not tombstoned";
}
@@ -1313,10 +1322,10 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
// The tombstoned tablets will still show up in ListTablets(),
// just with their data state set as TOMBSTONED. They should also be listed
- // as NOT_STARTED because we restarted the server.
+ // as INITIALIZED because we restarted the server.
ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
- ASSERT_EQ(tablet::NOT_STARTED, t.tablet_status().state());
+ ASSERT_EQ(tablet::INITIALIZED, t.tablet_status().state());
ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
<< t.tablet_status().tablet_id() << " not tombstoned";
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
index 71d7573..94c313e 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
@@ -22,6 +22,7 @@
#include <string>
#include <vector>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -33,6 +34,9 @@
#include "kudu/util/pstack_watcher.h"
#include "kudu/util/test_macros.h"
+DEFINE_bool(test_dump_stacks_on_failure, true,
+ "Whether to dump ExternalMiniCluster process stacks on test failure");
+
namespace kudu {
void ExternalMiniClusterITestBase::TearDown() {
@@ -69,7 +73,7 @@ void ExternalMiniClusterITestBase::StopCluster() {
return;
}
- if (HasFatalFailure()) {
+ if (HasFatalFailure() && FLAGS_test_dump_stacks_on_failure) {
LOG(INFO) << "Found fatal failure";
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
if (!cluster_->tablet_server(i)->IsProcessAlive()) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-itest.cc b/src/kudu/integration-tests/tombstoned_voting-itest.cc
new file mode 100644
index 0000000..b723738
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-itest.cc
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/internal_mini_cluster.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(allow_unsafe_replication_factor);
+DECLARE_bool(enable_tablet_copy);
+DECLARE_bool(raft_enable_tombstoned_voting);
+
+using kudu::consensus::MakeOpId;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletStatePB;
+using kudu::tserver::TabletServerErrorPB;
+using kudu::tserver::TSTabletManager;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TombstonedVotingITest : public MiniClusterITestBase {
+};
+
+// Ensure that a tombstoned replica cannot vote after we call Shutdown() on it.
+TEST_F(TombstonedVotingITest, TestNoVoteAfterShutdown) {
+ // This test waits for several seconds, so only run it in slow mode.
+ if (!AllowSlowTests()) return;
+
+ FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+ FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 50) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ workload.StopAndJoin();
+
+ // Figure out the tablet id to mess with.
+ vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ const string& tablet_id = tablet_ids[0];
+
+ // Ensure all servers are up to date.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ // Manually tombstone the replica on TS1, start an election on TS0, and wait
+ // until TS0 gets elected. If TS0 gets elected then TS1 was able to vote
+ // while tombstoned.
+ TSTabletManager* ts_tablet_manager = cluster_->mini_tablet_server(1)->server()->tablet_manager();
+ scoped_refptr<TabletReplica> ts1_replica;
+ ASSERT_OK(ts_tablet_manager->GetTabletReplica(tablet_id, &ts1_replica));
+
+ // Tombstone TS1's replica.
+ LOG(INFO) << "Tombstoning ts1...";
+ boost::optional<TabletServerErrorPB::Code> error_code;
+ ASSERT_OK(ts_tablet_manager->DeleteTablet(tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+ &error_code));
+ ASSERT_EQ(TabletStatePB::STOPPED, ts1_replica->state());
+
+ scoped_refptr<TabletReplica> ts0_replica;
+ ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+ tablet_id, &ts0_replica));
+ LeaderStepDownResponsePB resp;
+ ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+ ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+ ASSERT_OK(ts0_replica->consensus()->StartElection(
+ RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+ // Wait until TS0 is leader.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(RaftPeerPB::LEADER, ts0_replica->consensus()->role());
+ });
+
+ // Now shut down TS1. This will ensure that TS0 cannot get re-elected.
+ LOG(INFO) << "Shutting down ts1...";
+ ts1_replica->Shutdown();
+
+ // Start another election and wait for some time to see if it can get elected.
+ ASSERT_OK(ts0_replica->consensus()->StepDown(&resp));
+ ASSERT_OK(ts0_replica->consensus()->StartElection(
+ RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+ // Wait for some time to ensure TS0 cannot get elected.
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+ while (MonoTime::Now() < deadline) {
+ ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+}
+
+// Test that a tombstoned replica will vote correctly.
+// This is implemented by directly exercising the RPC API with different vote request parameters.
+TEST_F(TombstonedVotingITest, TestVotingLogic) {
+ // This test waits for several seconds, so only run it in slow mode.
+ if (!AllowSlowTests()) return;
+
+ FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+ FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 50) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ workload.StopAndJoin();
+
+ // Figure out the tablet id to mess with.
+ vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ const string& tablet_id = tablet_ids[0];
+
+ // Ensure all servers are up to date.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ // Shut down TS0 so it doesn't interfere with our testing.
+ cluster_->mini_tablet_server(0)->Shutdown();
+
+ // Figure out the last logged opid of TS1.
+ OpId last_logged_opid;
+ ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id,
+ ts_map_[cluster_->mini_tablet_server(1)->uuid()],
+ RECEIVED_OPID,
+ kTimeout,
+ &last_logged_opid));
+
+ // Tombstone TS1 (actually, the tablet replica hosted on TS1).
+ ASSERT_OK(itest::DeleteTablet(ts_map_[cluster_->mini_tablet_server(1)->uuid()], tablet_id,
+ TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+ // Loop this series of tests twice: the first time without restarting the TS,
+ // the 2nd time after a restart.
+ for (int i = 0; i < 2; i++) {
+ if (i == 1) {
+ // Restart tablet server #1 on the 2nd loop.
+ LOG(INFO) << "Restarting TS1...";
+ cluster_->mini_tablet_server(1)->Shutdown();
+ ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+ ASSERT_OK(cluster_->mini_tablet_server(1)->WaitStarted());
+ }
+
+ scoped_refptr<TabletReplica> replica;
+ ASSERT_OK(cluster_->mini_tablet_server(1)->server()->tablet_manager()->GetTabletReplica(
+ tablet_id, &replica));
+ ASSERT_EQ(i == 0 ? tablet::STOPPED : tablet::INITIALIZED, replica->state());
+
+ int64_t current_term = replica->consensus()->CurrentTerm();
+ current_term++;
+
+ // Ask TS1 for a vote that should be granted (new term, acceptable opid).
+ // Note: peers are required to vote regardless of whether they recognize the
+ // candidate's UUID or not, so the ID used here ("A") is not important.
+ TServerDetails* ts1_ets = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+ ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "A", current_term, last_logged_opid,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+
+ // Ask TS1 for a vote that should be denied (different candidate, same term).
+ Status s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+ kTimeout);
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "Already voted for candidate A in this term");
+
+ // Ask TS1 for a vote that should be denied (old term).
+ s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term - 1, last_logged_opid,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_MATCHES(s.ToString(), "Denying vote to candidate B for earlier term");
+
+ // Increment term.
+ current_term++;
+ OpId old_opid = MakeOpId(last_logged_opid.term(), last_logged_opid.index() - 1);
+
+ // Ask TS1 for a vote that should be denied (old last-logged opid).
+ s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, old_opid,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_MATCHES(s.ToString(),
+ "Denying vote to candidate B.*greater than that of the candidate");
+
+ // Ask for a successful vote for candidate B.
+ ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+ }
+}
+
+// Disable tombstoned voting and ensure that an election that would require it fails.
+TEST_F(TombstonedVotingITest, TestNoVoteIfTombstonedVotingDisabled) {
+ // This test waits for several seconds, so only run it in slow mode.
+ if (!AllowSlowTests()) return;
+
+ FLAGS_raft_enable_tombstoned_voting = false; // Disable tombstoned voting.
+ FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+ FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 50) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ workload.StopAndJoin();
+
+ // Figure out the tablet id to mess with.
+ vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ const string& tablet_id = tablet_ids[0];
+
+ // Ensure all servers are up to date.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ // Tombstone TS1 and try to get TS0 to vote for it.
+ TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+ ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+ scoped_refptr<TabletReplica> ts0_replica;
+ ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+ tablet_id, &ts0_replica));
+ LeaderStepDownResponsePB resp;
+ ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+ ASSERT_OK(ts0_replica->consensus()->StartElection(
+ RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+ // Wait for some time to ensure TS0 cannot get elected.
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+ while (MonoTime::Now() < deadline) {
+ ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+}
+
+// Test that a replica will not vote while tombstoned if it was deleted while
+// the last-logged opid was unknown. This may occur if a tablet is tombstoned
+// while in a FAILED state.
+TEST_F(TombstonedVotingITest, TestNoVoteIfNoLastLoggedOpId) {
+ if (!AllowSlowTests()) return; // This test waits for several seconds.
+
+ FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 50) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ tserver::MiniTabletServer* ts0 = cluster_->mini_tablet_server(0);
+ string ts0_uuid = ts0->uuid();
+ tserver::MiniTabletServer* ts1 = cluster_->mini_tablet_server(1);
+ string ts1_uuid = ts0->uuid();
+
+ // Determine the tablet id.
+ vector<string> tablet_ids = ts0->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ const string& tablet_id = tablet_ids[0];
+
+ // Ensure all servers are in sync.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ // Shut down each TS, then corrupt the TS0 cmeta.
+ string ts0_cmeta_path = ts0->server()->fs_manager()->GetConsensusMetadataPath(tablet_id);
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ cluster_->mini_tablet_server(i)->Shutdown();
+ }
+
+ std::unique_ptr<WritableFile> file;
+ ASSERT_OK(env_->NewWritableFile(ts0_cmeta_path, &file));
+ ASSERT_OK(file->Append("\0"));
+ ASSERT_OK(file->Close());
+
+ // Restart each TS so it comes back up on the same ports.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(cluster_->mini_tablet_server(i)->Restart());
+ }
+
+ // Wait until the tablet is in FAILED state.
+ ASSERT_OK(itest::WaitUntilTabletInState(ts_map_[ts0_uuid], tablet_id, TabletStatePB::FAILED,
+ kTimeout));
+ scoped_refptr<TabletReplica> replica;
+ ASSERT_TRUE(ts0->server() != nullptr);
+ ASSERT_TRUE(ts0->server()->tablet_manager() != nullptr);
+ ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet(tablet_id, &replica));
+ ASSERT_EQ(tablet::FAILED, replica->state());
+
+ // Now tombstone the failed replica on TS0.
+ ASSERT_OK(itest::DeleteTablet(ts_map_[ts0_uuid], tablet_id,
+ TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+ // Wait until TS1 is running.
+ ASSERT_EVENTUALLY([&] {
+ TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+ ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+ ASSERT_EQ(tablet::RUNNING, replica->state());
+ });
+
+ // Ensure that TS1 cannot become leader because TS0 will not vote.
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+ while (MonoTime::Now() < deadline) {
+ scoped_refptr<TabletReplica> replica;
+ TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+ ASSERT_TRUE(tablet_manager != nullptr);
+ ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+ std::shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
+ if (consensus) {
+ ASSERT_EQ(RaftPeerPB::FOLLOWER, consensus->role());
+ }
+ }
+}
+
+enum RestartAfterTombstone {
+ kNoRestart,
+ kRestart,
+};
+
+class TsRecoveryTombstonedITest : public MiniClusterITestBase,
+ public ::testing::WithParamInterface<RestartAfterTombstone> {
+};
+
+INSTANTIATE_TEST_CASE_P(Restart, TsRecoveryTombstonedITest,
+ ::testing::Values(kNoRestart, kRestart));
+
+// Basic tombstoned voting test.
+TEST_P(TsRecoveryTombstonedITest, TestTombstonedVoter) {
+ const RestartAfterTombstone to_restart = GetParam();
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+ NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 50) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ workload.StopAndJoin();
+
+ // Figure out the tablet id to Tablet Copy.
+ vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ const string& tablet_id = tablet_ids[0];
+
+ // Ensure all servers are up to date.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ auto live_ts_map = ts_map_;
+ ASSERT_EQ(1, live_ts_map.erase(cluster_->mini_tablet_server(1)->uuid()));
+
+ // Shut down TS 0 then tombstone TS 1. Restart TS 0.
+ // TS 0 should get a vote from TS 1 and then make a copy on TS 1, bringing
+ // the cluster back up to full strength.
+ LOG(INFO) << "shutting down TS " << cluster_->mini_tablet_server(0)->uuid();
+ cluster_->mini_tablet_server(0)->Shutdown();
+
+ LOG(INFO) << "tombstoning replica on TS " << cluster_->mini_tablet_server(1)->uuid();
+ TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+ ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+ if (to_restart == kRestart) {
+ LOG(INFO) << "restarting tombstoned TS " << cluster_->mini_tablet_server(1)->uuid();
+ cluster_->mini_tablet_server(1)->Shutdown();
+ ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+ }
+
+ LOG(INFO) << "restarting TS " << cluster_->mini_tablet_server(1)->uuid();
+ ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+
+ // Wait for the tablet copy to complete.
+ LOG(INFO) << "waiting for leader election and tablet copy to complete...";
+ ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+
+ LOG(INFO) << "attempting to write a few more rows...";
+
+ // Write a little bit more.
+ int target_rows = workload.rows_inserted() + 100;
+ workload.Start();
+ while (workload.rows_inserted() < target_rows) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ workload.StopAndJoin();
+
+ // Do a final verification that the servers match.
+ LOG(INFO) << "waiting for final agreement...";
+ ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-stress-test.cc b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
new file mode 100644
index 0000000..2c1cecb
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_num_iterations, 5,
+ "Number of tombstoned voting stress test iterations");
+
+using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::OpId;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using std::atomic;
+using std::string;
+using std::thread;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+class TombstonedVotingStressTest : public ExternalMiniClusterITestBase {
+ public:
+ TombstonedVotingStressTest()
+ : num_workers_(1),
+ cond_all_workers_blocked_(&lock_),
+ cond_workers_unblocked_(&lock_),
+ current_term_(1) {
+ }
+
+ protected:
+ enum State {
+ kRunning, // The tablet is running normally.
+ kTombstoning, // We are tombstoning the tablet.
+ kTombstoned, // The tombstoning is complete.
+ kCopying, // We are copying the tablet.
+ kTestComplete, // The test is complete and about to exit.
+ };
+
+ string State_Name(State state);
+
+ // 1. Check if workers should block, block if required.
+ // 2. Return current state.
+ State GetState();
+
+ // 1. Block worker threads.
+ // 2. Wait for all workers to be blocked.
+ // 3. Change state.
+ // 4. Unblock workers.
+ void SetState(State state);
+
+ // Thread that loops and requests votes from TS1.
+ void RunVoteRequestLoop();
+
+ // Set-once shared state.
+ string tablet_id_;
+ OpId last_logged_opid_;
+
+ Mutex lock_;
+ const int num_workers_;
+ int num_workers_blocked_ = 0;
+ bool block_workers_ = false;
+ ConditionVariable cond_all_workers_blocked_; // Triggers once all worker threads are blocked.
+ ConditionVariable cond_workers_unblocked_; // Triggers when the workers become unblocked.
+
+ // Protected by lock_.
+ State state_ = kRunning;
+
+ // State for the voter thread.
+ atomic<int64_t> current_term_;
+};
+
+string TombstonedVotingStressTest::State_Name(State state) {
+ switch (state) {
+ case kRunning:
+ return "kRunning";
+ case kTombstoning:
+ return "kTombstoning";
+ case kTombstoned:
+ return "kTombstoned";
+ case kCopying:
+ return "kCopying";
+ case kTestComplete:
+ return "kTestComplete";
+ default:
+ LOG(FATAL) << "Unknown state: " << state;
+ __builtin_unreachable();
+ }
+}
+
+TombstonedVotingStressTest::State TombstonedVotingStressTest::GetState() {
+ unique_lock<Mutex> l(lock_);
+ bool blocked = false;
+ if (block_workers_) {
+ num_workers_blocked_++;
+ blocked = true;
+ if (num_workers_blocked_ == num_workers_) {
+ cond_all_workers_blocked_.Signal();
+ }
+ }
+ while (block_workers_) {
+ cond_workers_unblocked_.Wait();
+ }
+ if (blocked) num_workers_blocked_--;
+ return state_;
+}
+
+void TombstonedVotingStressTest::SetState(State state) {
+ // 1. Block worker threads.
+ // 2. Wait for all workers to be blocked.
+ // 3. Change state.
+ // 4. Unblock workers.
+ LOG(INFO) << "setting state to " << State_Name(state);
+ unique_lock<Mutex> l(lock_);
+ block_workers_ = true;
+ while (num_workers_blocked_ != num_workers_) {
+ cond_all_workers_blocked_.Wait();
+ }
+ state_ = state;
+ block_workers_ = false;
+ cond_workers_unblocked_.Broadcast();
+}
+
+void TombstonedVotingStressTest::RunVoteRequestLoop() {
+ TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+ while (true) {
+ State state = GetState();
+ if (state == kTestComplete) break;
+ ++current_term_;
+ Status s = itest::RequestVote(ts1_ets, tablet_id_, "A", current_term_, last_logged_opid_,
+ /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+ kTimeout);
+ switch (state) {
+ case kRunning: FALLTHROUGH_INTENDED;
+ case kTombstoned:
+ // We should always be able to vote in this case.
+ if (s.ok()) {
+ LOG(INFO) << "Vote OK: state = " << state;
+ } else {
+ LOG(FATAL) << s.ToString() << ": tablet = " << tablet_id_ << ": state = " << state;
+ }
+ break;
+
+ // The vote can fail while in the process of tombstoning a replica
+ // because there is a small window of time where we have stopped
+ // RaftConsensus but we haven't yet recorded the last-logged opid in the
+ // tablet metadata.
+ case kTombstoning: FALLTHROUGH_INTENDED;
+ case kCopying:
+ if (s.ok()) {
+ LOG(INFO) << "Vote OK: state = " << state;
+ } else {
+ LOG(WARNING) << "Got bad vote while copying or tombstoning: " << s.ToString()
+ << ": state = " << state;
+ }
+ break;
+
+ default:
+ // We're shutting down.
+ continue;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(1)); // Don't run too hot.
+ }
+}
+
+// Stress test for tombstoned voting, including tombstoning, deleting, and
+// copying replicas.
+TEST_F(TombstonedVotingStressTest, TestTombstonedVotingUnderStress) {
+ // This test waits for several seconds, so only run it in slow mode.
+ if (!AllowSlowTests()) return;
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ // We want to control leader election manually and we only want 2 replicas.
+ NO_FATALS(StartCluster({ "--enable_leader_failure_detection=false" },
+ { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+ "--allow_unsafe_replication_factor=true" },
+ /*num_tablet_servers=*/ 2));
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+ workload.Setup();
+ ASSERT_OK(inspect_->WaitForReplicaCount(2));
+
+ // Figure out the tablet id.
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ tablet_id_ = tablets[0];
+
+ for (int i = 1; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id_, kTimeout));
+ LOG(INFO) << "TabletReplica is RUNNING: T " << tablet_id_
+ << " P " << cluster_->tablet_server(i)->uuid();
+ }
+
+ // Elect a leader and run some data through the cluster.
+ LOG(INFO) << "electing a leader...";
+ TServerDetails* ts0_ets = ts_map_[cluster_->tablet_server(0)->uuid()];
+ TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+ ASSERT_EVENTUALLY([&] {
+ // The tablet can report that it's running but still be bootstrapping, so
+ // retry until the election starts.
+ ASSERT_OK(itest::StartElection(ts0_ets, tablet_id_, kTimeout));
+ });
+
+ LOG(INFO) << "loading data...";
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+ ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id_, ts0_ets, COMMITTED_OPID, kTimeout,
+ &last_logged_opid_));
+
+ // Have the leader step down so we can test voting on the other replica.
+ // We don't shut this node down because it will serve as the tablet copy
+ // "source" during the test.
+ LOG(INFO) << "forcing leader to step down...";
+ ASSERT_OK(itest::LeaderStepDown(ts0_ets, tablet_id_, kTimeout));
+
+ // Now we are done with setup. Start the "stress" part of the test.
+ // Startup the voting thread.
+ LOG(INFO) << "starting stress thread...";
+ thread voter_thread([this] { RunVoteRequestLoop(); });
+ auto cleanup = MakeScopedCleanup([&] {
+ SetState(kTestComplete);
+ voter_thread.join();
+ });
+
+ int iter = 0;
+ while (iter++ < FLAGS_test_num_iterations) {
+ LOG(INFO) << "iteration " << (iter + 1) << " of " << FLAGS_test_num_iterations;
+ // Loop on voting for a while in running state. We want to give an
+ // opportunity for many votes during this time, and since voting involves
+ // fsyncing to disk, we wait for plenty of time here (and below).
+ SleepFor(MonoDelta::FromMilliseconds(500));
+
+ // 1. Tombstone tablet.
+ LOG(INFO) << "tombstoning tablet...";
+ SetState(kTombstoning);
+ ASSERT_OK(itest::DeleteTablet(ts1_ets, tablet_id_, TABLET_DATA_TOMBSTONED, boost::none,
+ kTimeout));
+ SetState(kTombstoned);
+
+ // Loop on voting for a while in tombstoned state.
+ SleepFor(MonoDelta::FromMilliseconds(500));
+
+ // 2. Copy tablet.
+ LOG(INFO) << "copying tablet...";
+ HostPort source_hp;
+ ASSERT_OK(HostPortFromPB(ts0_ets->registration.rpc_addresses(0), &source_hp));
+ SetState(kCopying);
+ ASSERT_OK(itest::StartTabletCopy(ts1_ets, tablet_id_, ts0_ets->uuid(), source_hp, current_term_,
+ kTimeout));
+ LOG(INFO) << "waiting for servers to agree...";
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+
+ SetState(kRunning);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index f6d7a19..0e3a9f1 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -344,6 +344,12 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
local_peer_pb_,
master_->tablet_apply_pool(),
Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
+ Status s = tablet_replica_->Init(master_->raft_pool());
+ if (!s.ok()) {
+ tablet_replica_->SetError(s);
+ tablet_replica_->Shutdown();
+ return s;
+ }
scoped_refptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK(cmeta_manager_->Load(metadata->tablet_id(), &cmeta));
@@ -371,7 +377,6 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
master_->messenger(),
scoped_refptr<rpc::ResultTracker>(),
log,
- master_->raft_pool(),
master_->tablet_prepare_pool()),
"Failed to Start() TabletReplica");
http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/metadata.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 83bffbc..70d8c1f 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -131,13 +131,29 @@ message TabletSuperBlockPB {
optional DataDirGroupPB data_dir_group = 15;
}
-// The enum of tablet states.
-// Tablet states are sent in TabletReports and kept in TabletReplica.
+// Tablet states represent stages of a TabletReplica's object lifecycle and are
+// reported to the master via tablet reports.
+//
+// Legal state transitions for a single TabletReplica object:
+//
+// NOT_INITIALIZED -> INITIALIZED -> BOOTSTRAPPING -> RUNNING -> STOPPING -> STOPPED -> SHUTDOWN
+// | | | ^ ^ ^
+// | | | | | |
+// | | +------------------+ | |
+// | +-------------------------------------+ |
+// +------------------------------------------------------+
+//
+// Since a TabletReplica instance is replaced when a Tablet Copy operation
+// occurs, from a remote perspective it is possible for a tablet replica to
+// appear to transition from SHUTDOWN back to NOT_INITIALIZED.
enum TabletStatePB {
UNKNOWN = 999;
- // Tablet has not yet started.
- NOT_STARTED = 5;
+ // Tablet has not yet been initialized.
+ NOT_INITIALIZED = 6;
+
+ // Tablet has been initialized but not yet started.
+ INITIALIZED = 5;
// Indicates the Tablet is bootstrapping, i.e. that the Tablet is not
// available for RPC.
@@ -152,8 +168,11 @@ enum TabletStatePB {
FAILED = 2;
// The Tablet is shutting down, and will not accept further requests.
- QUIESCING = 3;
+ STOPPING = 3;
+
+ // The tablet has been stopped, possibly because it has been tombstoned.
+ STOPPED = 7;
- // The Tablet has been stopped.
+ // The Tablet has been completely shut down.
SHUTDOWN = 4;
}