You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/06/02 18:47:09 UTC
kudu git commit: consensus: Remove Consensus interface
Repository: kudu
Updated Branches:
refs/heads/master 037f72b37 -> e3b7d4dc1
consensus: Remove Consensus interface
We only have one Consensus implementation now, and have no plans for
additional implementations in the future. So we can remove this
interface.
Change-Id: I21b976cb92619083e1f1766b13634b841b554c8c
Reviewed-on: http://gerrit.cloudera.org:8080/7040
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e3b7d4dc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e3b7d4dc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e3b7d4dc
Branch: refs/heads/master
Commit: e3b7d4dc1a6be020f3f126a66192ca1451e95e6f
Parents: 037f72b
Author: Mike Percy <mp...@apache.org>
Authored: Wed May 31 21:21:22 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 2 18:46:53 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/CMakeLists.txt | 1 -
src/kudu/consensus/consensus-test-util.h | 5 +-
src/kudu/consensus/consensus.cc | 81 ----
src/kudu/consensus/consensus.h | 430 -------------------
src/kudu/consensus/leader_election.h | 2 +-
src/kudu/consensus/pending_rounds.cc | 1 +
src/kudu/consensus/pending_rounds.h | 8 +-
src/kudu/consensus/raft_consensus.cc | 71 ++-
src/kudu/consensus/raft_consensus.h | 357 +++++++++++++--
.../consensus/raft_consensus_quorum-test.cc | 4 +-
.../ts_tablet_manager-itest.cc | 4 +-
src/kudu/master/catalog_manager.cc | 12 +-
src/kudu/master/sys_catalog.cc | 2 +-
src/kudu/tablet/tablet_replica.cc | 4 +-
src/kudu/tablet/tablet_replica.h | 10 +-
src/kudu/tablet/transactions/transaction.h | 4 +-
.../tablet/transactions/transaction_driver.cc | 9 +-
.../tablet/transactions/transaction_driver.h | 14 +-
.../tablet/transactions/write_transaction.h | 4 -
src/kudu/tserver/tablet_copy_source_session.cc | 4 +-
src/kudu/tserver/tablet_service.cc | 30 +-
src/kudu/tserver/ts_tablet_manager.cc | 8 +-
src/kudu/tserver/tserver-path-handlers.cc | 4 +-
23 files changed, 448 insertions(+), 621 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index e93ea94..ec53919 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -96,7 +96,6 @@ target_link_libraries(log
consensus_metadata_proto)
set(CONSENSUS_SRCS
- consensus.cc
consensus_meta.cc
consensus_peers.cc
consensus_queue.cc
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 89707aa..380d82b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -27,7 +27,6 @@
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/log.h"
@@ -643,7 +642,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
CHECK_OK(ThreadPoolBuilder("test-txn-factory").set_max_threads(1).Build(&pool_));
}
- void SetConsensus(Consensus* consensus) {
+ void SetConsensus(RaftConsensus* consensus) {
consensus_ = consensus;
}
@@ -673,7 +672,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
private:
gscoped_ptr<ThreadPool> pool_;
- Consensus* consensus_;
+ RaftConsensus* consensus_;
Log* log_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.cc b/src/kudu/consensus/consensus.cc
deleted file mode 100644
index 0553b5c..0000000
--- a/src/kudu/consensus/consensus.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/consensus/consensus.h"
-
-#include <set>
-
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-
-namespace kudu {
-namespace consensus {
-
-using std::shared_ptr;
-using strings::Substitute;
-
-ConsensusBootstrapInfo::ConsensusBootstrapInfo()
- : last_id(MinimumOpId()),
- last_committed_id(MinimumOpId()) {
-}
-
-ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
- STLDeleteElements(&orphaned_replicates);
-}
-
-ConsensusRound::ConsensusRound(Consensus* consensus,
- gscoped_ptr<ReplicateMsg> replicate_msg,
- ConsensusReplicatedCallback replicated_cb)
- : consensus_(consensus),
- replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
- replicated_cb_(std::move(replicated_cb)),
- bound_term_(-1) {}
-
-ConsensusRound::ConsensusRound(Consensus* consensus,
- const ReplicateRefPtr& replicate_msg)
- : consensus_(consensus),
- replicate_msg_(replicate_msg),
- bound_term_(-1) {
- DCHECK(replicate_msg_);
-}
-
-void ConsensusRound::NotifyReplicationFinished(const Status& status) {
- if (PREDICT_FALSE(replicated_cb_.is_null())) return;
- replicated_cb_.Run(status);
-}
-
-Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
- if (PREDICT_FALSE(bound_term_ != -1 &&
- bound_term_ != current_term)) {
- return Status::Aborted(
- strings::Substitute(
- "Transaction submitted in term $0 cannot be replicated in term $1",
- bound_term_, current_term));
- }
- return Status::OK();
-}
-
-scoped_refptr<ConsensusRound> Consensus::NewRound(
- gscoped_ptr<ReplicateMsg> replicate_msg,
- const ConsensusReplicatedCallback& replicated_cb) {
- return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb));
-}
-
-} // namespace consensus
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
deleted file mode 100644
index 59910be..0000000
--- a/src/kudu/consensus/consensus.h
+++ /dev/null
@@ -1,430 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDO_QUORUM_CONSENSUS_H_
-#define KUDO_QUORUM_CONSENSUS_H_
-
-#include <boost/optional/optional_fwd.hpp>
-#include <iosfwd>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "kudu/consensus/consensus.pb.h"
-#include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/status.h"
-#include "kudu/util/status_callback.h"
-
-namespace kudu {
-class MonoDelta;
-
-namespace log {
-class Log;
-struct RetentionIndexes;
-}
-
-namespace master {
-class SysCatalogTable;
-}
-
-namespace server {
-class Clock;
-}
-
-namespace tablet {
-class TabletReplica;
-}
-
-namespace tserver {
-class TabletServerErrorPB;
-}
-
-namespace consensus {
-
-class ConsensusCommitContinuation;
-class ConsensusRound;
-class ReplicaTransactionFactory;
-class TimeManager;
-
-typedef int64_t ConsensusTerm;
-
-typedef StatusCallback ConsensusReplicatedCallback;
-
-struct ConsensusOptions {
- std::string tablet_id;
-};
-
-// After completing bootstrap, some of the results need to be plumbed through
-// into the consensus implementation.
-struct ConsensusBootstrapInfo {
- ConsensusBootstrapInfo();
- ~ConsensusBootstrapInfo();
-
- // The id of the last operation in the log
- OpId last_id;
-
- // The id of the last committed operation in the log.
- OpId last_committed_id;
-
- // REPLICATE messages which were in the log with no accompanying
- // COMMIT. These need to be passed along to consensus init in order
- // to potentially commit them.
- //
- // These are owned by the ConsensusBootstrapInfo instance.
- std::vector<ReplicateMsg*> orphaned_replicates;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
-};
-
-// The external interface for a consensus peer.
-//
-// Note: Even though Consensus points to Log, it needs to be destroyed
-// after it. See Log class header comment for the reason why. On the other
-// hand Consensus must be quiesced before closing the log, otherwise it
-// will try to write to a destroyed/closed log.
-//
-// The order of these operations on shutdown must therefore be:
-// 1 - quiesce Consensus
-// 2 - close/destroy Log
-// 3 - destroy Consensus
-class Consensus : public RefCountedThreadSafe<Consensus> {
- public:
- Consensus() {}
-
- // Starts running the consensus algorithm.
- virtual Status Start(const ConsensusBootstrapInfo& info) = 0;
-
- // Returns true if consensus is running.
- virtual bool IsRunning() const = 0;
-
- // Emulates a leader election by simply making this peer leader.
- virtual Status EmulateElection() = 0;
-
- // Modes for StartElection().
- enum ElectionMode {
- // A normal leader election. Peers will not vote for this node
- // if they believe that a leader is alive.
- NORMAL_ELECTION,
-
- // A "pre-election". Peers will vote as they would for a normal
- // election, except that the votes will not be "binding". In other
- // words, they will not durably record their vote.
- PRE_ELECTION,
-
- // In this mode, peers will vote for this candidate even if they
- // think a leader is alive. This can be used for a faster hand-off
- // between a leader and one of its replicas.
- ELECT_EVEN_IF_LEADER_IS_ALIVE
- };
-
- // Reasons for StartElection().
- enum ElectionReason {
- // The election is being called because the Raft configuration has only
- // a single node and has just started up.
- INITIAL_SINGLE_NODE_ELECTION,
-
- // The election is being called because the timeout expired. In other
- // words, the previous leader probably failed (or there was no leader
- // in this term)
- ELECTION_TIMEOUT_EXPIRED,
-
- // The election is being started because of an explicit external request.
- EXTERNAL_REQUEST
- };
-
- // Triggers a leader election.
- virtual Status StartElection(ElectionMode mode, ElectionReason reason) = 0;
-
- // Wait until the node has LEADER role.
- // Returns Status::TimedOut if the role is not LEADER within 'timeout'.
- virtual Status WaitUntilLeaderForTests(const MonoDelta& timeout) = 0;
-
- // Implement a LeaderStepDown() request.
- virtual Status StepDown(LeaderStepDownResponsePB* resp) {
- return Status::NotSupported("Not implemented.");
- }
-
- // Creates a new ConsensusRound, the entity that owns all the data
- // structures required for a consensus round, such as the ReplicateMsg
- // (and later on the CommitMsg). ConsensusRound will also point to and
- // increase the reference count for the provided callbacks.
- scoped_refptr<ConsensusRound> NewRound(
- gscoped_ptr<ReplicateMsg> replicate_msg,
- const ConsensusReplicatedCallback& replicated_cb);
-
- // Called by a Leader to replicate an entry to the state machine.
- //
- // From the leader instance perspective execution proceeds as follows:
- //
- // Leader RaftConfig
- // + +
- // 1) Req->| Replicate() |
- // | |
- // 2) +-------------replicate-------------->|
- // |<---------------ACK------------------+
- // | |
- // 3) +--+ |
- // <----+ round.NotifyReplicationFinished()|
- // | |
- // 3a) | +------ update commitIndex ------->|
- // | |
- //
- // 1) Caller calls Replicate(), method returns immediately to the caller and
- // runs asynchronously.
- //
- // 2) Leader replicates the entry to the peers using the consensus
- // algorithm, proceeds as soon as a majority of voters acknowledges the
- // entry.
- //
- // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished,
- // which calls the ConsensusReplicatedCallback.
- //
- // 3a) The leader asynchronously notifies other peers of the new
- // commit index, which tells them to apply the operation.
- //
- // This method can only be called on the leader, i.e. role() == LEADER
- virtual Status Replicate(const scoped_refptr<ConsensusRound>& round) = 0;
-
- // Ensures that the consensus implementation is currently acting as LEADER,
- // and thus is allowed to submit operations to be prepared before they are
- // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the
- // implementation also stores the current term inside the round's "bound_term"
- // member. When we eventually are about to replicate the transaction, we verify
- // that the term has not changed in the meantime.
- virtual Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
- return Status::OK();
- }
-
- // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
- // state machines. This is equivalent to "AppendEntries()" in Raft
- // terminology.
- //
- // ConsensusRequestPB contains a sequence of 0 or more operations to apply
- // on the replica. If there are 0 operations the request is considered
- // 'status-only' i.e. the leader is communicating with the follower only
- // in order to pass back and forth information on watermarks (eg committed
- // operation ID, replicated op id, etc).
- //
- // If the sequence contains 1 or more operations they will be replicated
- // in the same order as the leader, and submitted for asynchronous Prepare
- // in the same order.
- //
- // The leader also provides information on the index of the latest
- // operation considered committed by consensus. The replica uses this
- // information to update the state of any pending (previously replicated/prepared)
- // transactions.
- //
- // Returns Status::OK if the response has been filled (regardless of accepting
- // or rejecting the specific request). Returns non-OK Status if a specific
- // error response could not be formed, which will result in the service
- // returning an UNKNOWN_ERROR RPC error code to the caller and including the
- // stringified Status message.
- virtual Status Update(const ConsensusRequestPB* request,
- ConsensusResponsePB* response) = 0;
-
- // Messages sent from CANDIDATEs to voting peers to request their vote
- // in leader election.
- virtual Status RequestVote(const VoteRequestPB* request,
- VoteResponsePB* response) = 0;
-
- // Implement a ChangeConfig() request.
- virtual Status ChangeConfig(const ChangeConfigRequestPB& req,
- const StatusCallback& client_cb,
- boost::optional<tserver::TabletServerErrorPB::Code>* error) {
- return Status::NotSupported("Not implemented.");
- }
-
- virtual Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
- tserver::TabletServerErrorPB::Code* error) = 0;
-
- // Returns the current Raft role of this instance.
- virtual RaftPeerPB::Role role() const = 0;
-
- // Returns the uuid of this peer.
- virtual const std::string& peer_uuid() const = 0;
-
- // Returns the id of the tablet whose updates this consensus instance helps coordinate.
- virtual const std::string& tablet_id() const = 0;
-
- virtual scoped_refptr<TimeManager> time_manager() const = 0;
-
- // Returns a copy of the state of the consensus system.
- virtual ConsensusStatePB ConsensusState() const = 0;
-
- // Returns a copy of the current committed Raft configuration.
- virtual RaftConfigPB CommittedConfig() const = 0;
-
- virtual void DumpStatusHtml(std::ostream& out) const = 0;
-
- // Stops running the consensus algorithm.
- virtual void Shutdown() = 0;
-
- // Returns the last OpId (either received or committed, depending on the
- // 'type' argument) that the Consensus implementation knows about.
- // Primarily used for testing purposes.
- virtual Status GetLastOpId(OpIdType type, OpId* id) {
- return Status::NotFound("Not implemented.");
- }
-
-
- // Return the log indexes which the consensus implementation would like to retain.
- //
- // The returned 'for_durability' index ensures that no logs are GCed before
- // the operation is fully committed. The returned 'for_peers' index indicates
- // the index of the farthest-behind peer so that the log will try to avoid
- // GCing these before the peer has caught up.
- virtual log::RetentionIndexes GetRetentionIndexes() = 0;
-
- protected:
- friend class RefCountedThreadSafe<Consensus>;
- friend class tablet::TabletReplica;
- friend class master::SysCatalogTable;
-
- // This class is refcounted.
- virtual ~Consensus() {}
-
- enum State {
- kNotInitialized,
- kInitializing,
- kConfiguring,
- kRunning,
- };
- private:
- DISALLOW_COPY_AND_ASSIGN(Consensus);
-};
-
-// Factory for replica transactions.
-// An implementation of this factory must be registered prior to consensus
-// start, and is used to create transactions when the consensus implementation receives
-// messages from the leader.
-//
-// Replica transactions execute the following way:
-//
-// - When a ReplicateMsg is first received from the leader, the Consensus
-// instance creates the ConsensusRound and calls StartReplicaTransaction().
-// This will trigger the Prepare(). At the same time replica consensus
-// instance immediately stores the ReplicateMsg in the Log. Once the replicate
-// message is stored in stable storage an ACK is sent to the leader (i.e. the
-// replica Consensus instance does not wait for Prepare() to finish).
-//
-// - When the CommitMsg for a replicate is first received from the leader
-// the replica waits for the corresponding Prepare() to finish (if it has
-// not completed yet) and then proceeds to trigger the Apply().
-//
-// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
-// a CommitMsg to the log to ensure that the operation can be properly restored
-// on a restart.
-class ReplicaTransactionFactory {
- public:
- virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
-
- virtual ~ReplicaTransactionFactory() {}
-};
-
-// Context for a consensus round on the LEADER side, typically created as an
-// out-parameter of Consensus::Append.
-// This class is ref-counted because we want to ensure it stays alive for the
-// duration of the Transaction when it is associated with a Transaction, while
-// we also want to ensure it has a proper lifecycle when a ConsensusRound is
-// pushed that is not associated with a Tablet transaction.
-class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
-
- public:
- // Ctor used for leader transactions. Leader transactions can and must specify the
- // callbacks prior to initiating the consensus round.
- ConsensusRound(Consensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
- ConsensusReplicatedCallback replicated_cb);
-
- // Ctor used for follower/learner transactions. These transactions do not use the
- // replicate callback and the commit callback is set later, after the transaction
- // is actually started.
- ConsensusRound(Consensus* consensus,
- const ReplicateRefPtr& replicate_msg);
-
- ReplicateMsg* replicate_msg() {
- return replicate_msg_->get();
- }
-
- const ReplicateRefPtr& replicate_scoped_refptr() {
- return replicate_msg_;
- }
-
- // Returns the id of the (replicate) operation this context
- // refers to. This is only set _after_ Consensus::Replicate(context).
- OpId id() const {
- return replicate_msg_->get()->id();
- }
-
- // Register a callback that is called by Consensus to notify that the round
- // is considered either replicated, if 'status' is OK(), or that it has
- // permanently failed to replicate if 'status' is anything else. If 'status'
- // is OK() then the operation can be applied to the state machine, otherwise
- // the operation should be aborted.
- void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
- replicated_cb_ = replicated_cb;
- }
-
- // If a continuation was set, notifies it that the round has been replicated.
- void NotifyReplicationFinished(const Status& status);
-
- // Binds this round such that it may not be eventually executed in any term
- // other than 'term'.
- // See CheckBoundTerm().
- void BindToTerm(int64_t term) {
- DCHECK_EQ(bound_term_, -1);
- bound_term_ = term;
- }
-
- // Check for a rare race in which an operation is submitted to the LEADER in some term,
- // then before the operation is prepared, the replica loses its leadership, receives
- // more operations as a FOLLOWER, and then regains its leadership. We detect this case
- // by setting the ConsensusRound's "bound term" when it is first submitted to the
- // PREPARE queue, and validate that the term is still the same when we have finished
- // preparing it. See KUDU-597 for details.
- //
- // If this round has not been bound to any term, this is a no-op.
- Status CheckBoundTerm(int64_t current_term) const;
-
- private:
- friend class RaftConsensusQuorumTest;
- friend class RefCountedThreadSafe<ConsensusRound>;
-
- ~ConsensusRound() {}
-
- Consensus* consensus_;
- // This round's replicate message.
- ReplicateRefPtr replicate_msg_;
-
- // The continuation that will be called once the transaction is
- // deemed committed/aborted by consensus.
- ConsensusReplicatedCallback replicated_cb_;
-
- // The leader term that this round was submitted in. CheckBoundTerm()
- // ensures that, when it is eventually replicated, the term has not
- // changed in the meantime.
- //
- // Set to -1 if no term has been bound.
- int64_t bound_term_;
-};
-
-} // namespace consensus
-} // namespace kudu
-
-#endif /* CONSENSUS_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/leader_election.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index d0129f4..d0f15ce 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -23,8 +23,8 @@
#include <unordered_map>
#include <vector>
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc
index 121cb24..adbca50 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -17,6 +17,7 @@
#include "kudu/consensus/pending_rounds.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/pending_rounds.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h
index 02a8686..ccc9cd4 100644
--- a/src/kudu/consensus/pending_rounds.h
+++ b/src/kudu/consensus/pending_rounds.h
@@ -20,14 +20,16 @@
#include <map>
#include <string>
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/gutil/macros.h"
-#include "kudu/util/status.h"
+#include "kudu/gutil/ref_counted.h"
namespace kudu {
-namespace consensus {
+class Status;
+namespace consensus {
+class ConsensusRound;
class TimeManager;
// Tracks the pending consensus rounds being managed by a Raft replica (either leader
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index b7f5cdc..d50bf9a 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -377,24 +377,24 @@ Status RaftConsensus::EmulateElection() {
}
namespace {
-const char* ModeString(Consensus::ElectionMode mode) {
+const char* ModeString(RaftConsensus::ElectionMode mode) {
switch (mode) {
- case Consensus::NORMAL_ELECTION:
+ case RaftConsensus::NORMAL_ELECTION:
return "leader election";
- case Consensus::PRE_ELECTION:
+ case RaftConsensus::PRE_ELECTION:
return "pre-election";
- case Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
+ case RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE:
return "forced leader election";
}
__builtin_unreachable(); // silence gcc warnings
}
-string ReasonString(Consensus::ElectionReason reason, StringPiece leader_uuid) {
+string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uuid) {
switch (reason) {
- case Consensus::INITIAL_SINGLE_NODE_ELECTION:
+ case RaftConsensus::INITIAL_SINGLE_NODE_ELECTION:
return "initial election of a single-replica configuration";
- case Consensus::EXTERNAL_REQUEST:
+ case RaftConsensus::EXTERNAL_REQUEST:
return "received explicit request";
- case Consensus::ELECTION_TIMEOUT_EXPIRED:
+ case RaftConsensus::ELECTION_TIMEOUT_EXPIRED:
if (leader_uuid.empty()) {
return "no leader contacted us within the election timeout";
}
@@ -522,6 +522,12 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
return Status::OK();
}
+scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
+ gscoped_ptr<ReplicateMsg> replicate_msg,
+ const ConsensusReplicatedCallback& replicated_cb) {
+ return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb));
+}
+
void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) {
DCHECK_EQ(name, kTimerId);
// Start an election.
@@ -2676,5 +2682,54 @@ ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
return cmeta_.get();
}
+////////////////////////////////////////////////////////////////////////
+// ConsensusBootstrapInfo
+////////////////////////////////////////////////////////////////////////
+
+ConsensusBootstrapInfo::ConsensusBootstrapInfo()
+ : last_id(MinimumOpId()),
+ last_committed_id(MinimumOpId()) {
+}
+
+ConsensusBootstrapInfo::~ConsensusBootstrapInfo() {
+ STLDeleteElements(&orphaned_replicates);
+}
+
+////////////////////////////////////////////////////////////////////////
+// ConsensusRound
+////////////////////////////////////////////////////////////////////////
+
+ConsensusRound::ConsensusRound(RaftConsensus* consensus,
+ gscoped_ptr<ReplicateMsg> replicate_msg,
+ ConsensusReplicatedCallback replicated_cb)
+ : consensus_(consensus),
+ replicate_msg_(new RefCountedReplicate(replicate_msg.release())),
+ replicated_cb_(std::move(replicated_cb)),
+ bound_term_(-1) {}
+
+ConsensusRound::ConsensusRound(RaftConsensus* consensus,
+ const ReplicateRefPtr& replicate_msg)
+ : consensus_(consensus),
+ replicate_msg_(replicate_msg),
+ bound_term_(-1) {
+ DCHECK(replicate_msg_);
+}
+
+void ConsensusRound::NotifyReplicationFinished(const Status& status) {
+ if (PREDICT_FALSE(replicated_cb_.is_null())) return;
+ replicated_cb_.Run(status);
+}
+
+Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {
+ if (PREDICT_FALSE(bound_term_ != -1 &&
+ bound_term_ != current_term)) {
+ return Status::Aborted(
+ strings::Substitute(
+ "Transaction submitted in term $0 cannot be replicated in term $1",
+ bound_term_, current_term));
+ }
+ return Status::OK();
+}
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index f00a8c2..9ce29ec 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -24,41 +24,99 @@
#include <utility>
#include <vector>
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/failure_detector.h"
+#include "kudu/util/status_callback.h"
namespace kudu {
class Counter;
class FailureDetector;
class HostPort;
+class MonoDelta;
+class Status;
class ThreadPool;
-namespace server {
-class Clock;
+namespace log {
+class Log;
+struct RetentionIndexes;
}
namespace rpc {
class Messenger;
}
+namespace server {
+class Clock;
+}
+
+namespace tserver {
+class TabletServerErrorPB;
+}
+
namespace consensus {
+
class ConsensusMetadata;
+class ConsensusRound;
class Peer;
class PeerProxyFactory;
class PeerManager;
+class ReplicaTransactionFactory;
class TimeManager;
+
+struct ConsensusBootstrapInfo;
struct ElectionResult;
-class RaftConsensus : public Consensus,
+struct ConsensusOptions {
+ std::string tablet_id;
+};
+
+typedef int64_t ConsensusTerm;
+typedef StatusCallback ConsensusReplicatedCallback;
+
+class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
public PeerMessageQueueObserver {
public:
+
+ // Modes for StartElection().
+ enum ElectionMode {
+ // A normal leader election. Peers will not vote for this node
+ // if they believe that a leader is alive.
+ NORMAL_ELECTION,
+
+ // A "pre-election". Peers will vote as they would for a normal
+ // election, except that the votes will not be "binding". In other
+ // words, they will not durably record their vote.
+ PRE_ELECTION,
+
+ // In this mode, peers will vote for this candidate even if they
+ // think a leader is alive. This can be used for a faster hand-off
+ // between a leader and one of its replicas.
+ ELECT_EVEN_IF_LEADER_IS_ALIVE
+ };
+
+ // Reasons for StartElection().
+ enum ElectionReason {
+ // The election is being called because the Raft configuration has only
+ // a single node and has just started up.
+ INITIAL_SINGLE_NODE_ELECTION,
+
+ // The election is being called because the timeout expired. In other
+ // words, the previous leader probably failed (or there was no leader
+ // in this term)
+ ELECTION_TIMEOUT_EXPIRED,
+
+ // The election is being started because of an explicit external request.
+ EXTERNAL_REQUEST
+ };
+
static scoped_refptr<RaftConsensus> Create(
ConsensusOptions options,
std::unique_ptr<ConsensusMetadata> cmeta,
@@ -85,64 +143,150 @@ class RaftConsensus : public Consensus,
std::shared_ptr<MemTracker> parent_mem_tracker,
Callback<void(const std::string& reason)> mark_dirty_clbk);
- virtual ~RaftConsensus();
+ // Starts running the Raft consensus algorithm.
+ Status Start(const ConsensusBootstrapInfo& info);
- Status Start(const ConsensusBootstrapInfo& info) override;
-
- bool IsRunning() const override;
+ // Returns true if RaftConsensus is running.
+ bool IsRunning() const;
// Emulates an election by increasing the term number and asserting leadership
// in the configuration by sending a NO_OP to other peers.
// This is NOT safe to use in a distributed configuration with failure detection
// enabled, as it could result in a split-brain scenario.
- Status EmulateElection() override;
+ Status EmulateElection();
+
+ // Triggers a leader election.
+ Status StartElection(ElectionMode mode, ElectionReason reason);
- Status StartElection(ElectionMode mode, ElectionReason reason) override;
+ // Wait until the node has LEADER role.
+ // Returns Status::TimedOut if the role is not LEADER within 'timeout'.
+ Status WaitUntilLeaderForTests(const MonoDelta& timeout);
- Status WaitUntilLeaderForTests(const MonoDelta& timeout) override;
+ // Implement a LeaderStepDown() request.
+ Status StepDown(LeaderStepDownResponsePB* resp);
- Status StepDown(LeaderStepDownResponsePB* resp) override;
+ // Creates a new ConsensusRound, the entity that owns all the data
+ // structures required for a consensus round, such as the ReplicateMsg
+ // (and later on the CommitMsg). ConsensusRound will also point to and
+ // increase the reference count for the provided callbacks.
+ scoped_refptr<ConsensusRound> NewRound(
+ gscoped_ptr<ReplicateMsg> replicate_msg,
+ const ConsensusReplicatedCallback& replicated_cb);
// Call StartElection(), log a warning if the call fails (usually due to
// being shut down).
void ReportFailureDetected(const std::string& name, const Status& msg);
- Status Replicate(const scoped_refptr<ConsensusRound>& round) override;
-
- Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) override;
-
+ // Called by a Leader to replicate an entry to the state machine.
+ //
+ // From the leader instance perspective execution proceeds as follows:
+ //
+ // Leader RaftConfig
+ // + +
+ // 1) Req->| Replicate() |
+ // | |
+ // 2) +-------------replicate-------------->|
+ // |<---------------ACK------------------+
+ // | |
+ // 3) +--+ |
+ // <----+ round.NotifyReplicationFinished()|
+ // | |
+ // 3a) | +------ update commitIndex ------->|
+ // | |
+ //
+ // 1) Caller calls Replicate(), method returns immediately to the caller and
+ // runs asynchronously.
+ //
+ // 2) Leader replicates the entry to the peers using the consensus
+ // algorithm, proceeds as soon as a majority of voters acknowledges the
+ // entry.
+ //
+ // 3) Leader defers to the caller by calling ConsensusRound::NotifyReplicationFinished,
+ // which calls the ConsensusReplicatedCallback.
+ //
+ // 3a) The leader asynchronously notifies other peers of the new
+ // commit index, which tells them to apply the operation.
+ //
+ // This method can only be called on the leader, i.e. role() == LEADER
+ Status Replicate(const scoped_refptr<ConsensusRound>& round);
+
+ // Ensures that the consensus implementation is currently acting as LEADER,
+ // and thus is allowed to submit operations to be prepared before they are
+ // replicated. To avoid a time-of-check-to-time-of-use (TOCTOU) race, the
+ // implementation also stores the current term inside the round's "bound_term"
+ // member. When we eventually are about to replicate the transaction, we verify
+ // that the term has not changed in the meantime.
+ Status CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round);
+
+ // Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
+ // state machines. This is equivalent to "AppendEntries()" in Raft
+ // terminology.
+ //
+ // ConsensusRequestPB contains a sequence of 0 or more operations to apply
+ // on the replica. If there are 0 operations the request is considered
+ // 'status-only' i.e. the leader is communicating with the follower only
+ // in order to pass back and forth information on watermarks (eg committed
+ // operation ID, replicated op id, etc).
+ //
+ // If the sequence contains 1 or more operations they will be replicated
+ // in the same order as the leader, and submitted for asynchronous Prepare
+ // in the same order.
+ //
+ // The leader also provides information on the index of the latest
+ // operation considered committed by consensus. The replica uses this
+ // information to update the state of any pending (previously replicated/prepared)
+ // transactions.
+ //
+ // Returns Status::OK if the response has been filled (regardless of accepting
+ // or rejecting the specific request). Returns non-OK Status if a specific
+ // error response could not be formed, which will result in the service
+ // returning an UNKNOWN_ERROR RPC error code to the caller and including the
+ // stringified Status message.
Status Update(const ConsensusRequestPB* request,
- ConsensusResponsePB* response) override;
+ ConsensusResponsePB* response);
+ // Messages sent from CANDIDATEs to voting peers to request their vote
+ // in leader election.
Status RequestVote(const VoteRequestPB* request,
- VoteResponsePB* response) override;
+ VoteResponsePB* response);
+ // Implement a ChangeConfig() request.
Status ChangeConfig(const ChangeConfigRequestPB& req,
const StatusCallback& client_cb,
- boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;
+ boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
+ // Implement an UnsafeChangeConfig() request.
Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
- tserver::TabletServerErrorPB::Code* error_code) override;
+ tserver::TabletServerErrorPB::Code* error_code);
- Status GetLastOpId(OpIdType type, OpId* id) override;
+ // Returns the last OpId (either received or committed, depending on the
+ // 'type' argument) that the Consensus implementation knows about.
+ // Primarily used for testing purposes.
+ Status GetLastOpId(OpIdType type, OpId* id);
- RaftPeerPB::Role role() const override;
+ // Returns the current Raft role of this instance.
+ RaftPeerPB::Role role() const;
+ // Returns the uuid of this peer.
// Thread-safe.
- const std::string& peer_uuid() const override;
+ const std::string& peer_uuid() const;
+ // Returns the id of the tablet whose updates this consensus instance helps coordinate.
// Thread-safe.
- const std::string& tablet_id() const override;
+ const std::string& tablet_id() const;
- scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
+ scoped_refptr<TimeManager> time_manager() const { return time_manager_; }
- ConsensusStatePB ConsensusState() const override;
+ // Returns a copy of the state of the consensus system.
+ ConsensusStatePB ConsensusState() const;
- RaftConfigPB CommittedConfig() const override;
+ // Returns a copy of the current committed Raft configuration.
+ RaftConfigPB CommittedConfig() const;
- void DumpStatusHtml(std::ostream& out) const override;
+ void DumpStatusHtml(std::ostream& out) const;
- void Shutdown() override;
+ // Stop running the Raft consensus algorithm.
+ void Shutdown();
// Makes this peer advance it's term (and step down if leader), for tests.
Status AdvanceTermForTests(int64_t new_term);
@@ -158,17 +302,24 @@ class RaftConsensus : public Consensus,
// Updates the committed_index and triggers the Apply()s for whatever
// transactions were pending.
// This is idempotent.
- void NotifyCommitIndex(int64_t commit_index) override;
+ void NotifyCommitIndex(int64_t commit_index);
- void NotifyTermChange(int64_t term) override;
+ void NotifyTermChange(int64_t term);
void NotifyFailedFollower(const std::string& uuid,
int64_t term,
- const std::string& reason) override;
+ const std::string& reason);
- log::RetentionIndexes GetRetentionIndexes() override;
+ // Return the log indexes which the consensus implementation would like to retain.
+ //
+ // The returned 'for_durability' index ensures that no logs are GCed before
+ // the operation is fully committed. The returned 'for_peers' index indicates
+ // the index of the farthest-behind peer so that the log will try to avoid
+ // GCing these before the peer has caught up.
+ log::RetentionIndexes GetRetentionIndexes();
private:
+ friend class RefCountedThreadSafe<RaftConsensus>;
friend class RaftConsensusQuorumTest;
FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
@@ -224,6 +375,9 @@ class RaftConsensus : public Consensus,
using LockGuard = std::lock_guard<simple_spinlock>;
using UniqueLock = std::unique_lock<simple_spinlock>;
+ // Private because this class is refcounted.
+ ~RaftConsensus();
+
// Returns string description for State enum value.
static const char* State_Name(State state);
@@ -636,5 +790,142 @@ class RaftConsensus : public Consensus,
DISALLOW_COPY_AND_ASSIGN(RaftConsensus);
};
+// After completing bootstrap, some of the results need to be plumbed through
+// into the consensus implementation.
+struct ConsensusBootstrapInfo {
+ ConsensusBootstrapInfo();
+ ~ConsensusBootstrapInfo();
+
+ // The id of the last operation in the log
+ OpId last_id;
+
+ // The id of the last committed operation in the log.
+ OpId last_committed_id;
+
+ // REPLICATE messages which were in the log with no accompanying
+ // COMMIT. These need to be passed along to consensus init in order
+ // to potentially commit them.
+ //
+ // These are owned by the ConsensusBootstrapInfo instance.
+ std::vector<ReplicateMsg*> orphaned_replicates;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
+};
+
+// Factory for replica transactions.
+// An implementation of this factory must be registered prior to consensus
+// start, and is used to create transactions when the consensus implementation receives
+// messages from the leader.
+//
+// Replica transactions execute the following way:
+//
+// - When a ReplicateMsg is first received from the leader, the RaftConsensus
+// instance creates the ConsensusRound and calls StartReplicaTransaction().
+// This will trigger the Prepare(). At the same time replica consensus
+// instance immediately stores the ReplicateMsg in the Log. Once the replicate
+// message is stored in stable storage an ACK is sent to the leader (i.e. the
+// replica RaftConsensus instance does not wait for Prepare() to finish).
+//
+// - When the CommitMsg for a replicate is first received from the leader
+// the replica waits for the corresponding Prepare() to finish (if it has
+// not completed yet) and then proceeds to trigger the Apply().
+//
+// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
+// a CommitMsg to the log to ensure that the operation can be properly restored
+// on a restart.
+class ReplicaTransactionFactory {
+ public:
+ virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+
+ virtual ~ReplicaTransactionFactory() {}
+};
+
+// Context for a consensus round on the LEADER side, typically created as an
+// out-parameter of RaftConsensus::Append().
+// This class is ref-counted because we want to ensure it stays alive for the
+// duration of the Transaction when it is associated with a Transaction, while
+// we also want to ensure it has a proper lifecycle when a ConsensusRound is
+// pushed that is not associated with a Tablet transaction.
+class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
+
+ public:
+ // Ctor used for leader transactions. Leader transactions can and must specify the
+ // callbacks prior to initiating the consensus round.
+ ConsensusRound(RaftConsensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
+ ConsensusReplicatedCallback replicated_cb);
+
+ // Ctor used for follower/learner transactions. These transactions do not use the
+ // replicate callback and the commit callback is set later, after the transaction
+ // is actually started.
+ ConsensusRound(RaftConsensus* consensus,
+ const ReplicateRefPtr& replicate_msg);
+
+ ReplicateMsg* replicate_msg() {
+ return replicate_msg_->get();
+ }
+
+ const ReplicateRefPtr& replicate_scoped_refptr() {
+ return replicate_msg_;
+ }
+
+ // Returns the id of the (replicate) operation this context
+ // refers to. This is only set _after_ RaftConsensus::Replicate(context).
+ OpId id() const {
+ return replicate_msg_->get()->id();
+ }
+
+ // Register a callback that is called by RaftConsensus to notify that the round
+ // is considered either replicated, if 'status' is OK(), or that it has
+ // permanently failed to replicate if 'status' is anything else. If 'status'
+ // is OK() then the operation can be applied to the state machine, otherwise
+ // the operation should be aborted.
+ void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
+ replicated_cb_ = replicated_cb;
+ }
+
+ // If a continuation was set, notifies it that the round has been replicated.
+ void NotifyReplicationFinished(const Status& status);
+
+ // Binds this round such that it may not be eventually executed in any term
+ // other than 'term'.
+ // See CheckBoundTerm().
+ void BindToTerm(int64_t term) {
+ DCHECK_EQ(bound_term_, -1);
+ bound_term_ = term;
+ }
+
+ // Check for a rare race in which an operation is submitted to the LEADER in some term,
+ // then before the operation is prepared, the replica loses its leadership, receives
+ // more operations as a FOLLOWER, and then regains its leadership. We detect this case
+ // by setting the ConsensusRound's "bound term" when it is first submitted to the
+ // PREPARE queue, and validate that the term is still the same when we have finished
+ // preparing it. See KUDU-597 for details.
+ //
+ // If this round has not been bound to any term, this is a no-op.
+ Status CheckBoundTerm(int64_t current_term) const;
+
+ private:
+ friend class RefCountedThreadSafe<ConsensusRound>;
+ friend class RaftConsensusQuorumTest;
+
+ ~ConsensusRound() {}
+
+ RaftConsensus* consensus_;
+ // This round's replicate message.
+ ReplicateRefPtr replicate_msg_;
+
+ // The continuation that will be called once the transaction is
+ // deemed committed/aborted by consensus.
+ ConsensusReplicatedCallback replicated_cb_;
+
+ // The leader term that this round was submitted in. CheckBoundTerm()
+ // ensures that, when it is eventually replicated, the term has not
+ // changed in the meantime.
+ //
+ // Set to -1 if no term has been bound.
+ int64_t bound_term_;
+};
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index a0933c8..d45890b 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -890,8 +890,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
// non-shutdown peer in the list become leader.
int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
- ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
- Consensus::EXTERNAL_REQUEST));
+ ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
+ RaftConsensus::EXTERNAL_REQUEST));
WaitUntilLeaderForTests(new_leader.get());
LOG(INFO) << "Election won";
int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/integration-tests/ts_tablet_manager-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 9d5c6a5..547fef0 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -134,7 +134,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
ASSERT_OK(CreateTabletServerMap(master_proxy, client_messenger_, &ts_map));
ValueDeleter deleter(&ts_map);
- // Collect the TabletReplicas so we get direct access to Consensus.
+ // Collect the TabletReplicas so we get direct access to RaftConsensus.
vector<scoped_refptr<TabletReplica> > tablet_replicas;
for (int replica = 0; replica < kNumReplicas; replica++) {
MiniTabletServer* ts = cluster_->mini_tablet_server(replica);
@@ -157,7 +157,7 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
SCOPED_TRACE(Substitute("Iter: $0", i));
int new_leader_idx = rand() % 2;
LOG(INFO) << "Electing peer " << new_leader_idx << "...";
- consensus::Consensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
+ consensus::RaftConsensus* con = CHECK_NOTNULL(tablet_replicas[new_leader_idx]->consensus());
ASSERT_OK(con->EmulateElection());
LOG(INFO) << "Waiting for servers to agree...";
ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(5),
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index c67b423..df7935e 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -222,10 +222,10 @@ namespace master {
using base::subtle::NoBarrier_CompareAndSwap;
using base::subtle::NoBarrier_Load;
using cfile::TypeEncodingInfo;
-using consensus::Consensus;
using consensus::ConsensusServiceProxy;
using consensus::ConsensusStatePB;
using consensus::GetConsensusRole;
+using consensus::RaftConsensus;
using consensus::RaftPeerPB;
using consensus::StartTabletCopyRequestPB;
using consensus::kMinimumTerm;
@@ -747,7 +747,7 @@ Status CatalogManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
const string& uuid = master_->fs_manager()->uuid();
if (!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid) {
return Status::IllegalState(
- Substitute("Node $0 not leader. Consensus state: $1",
+ Substitute("Node $0 not leader. Raft Consensus state: $1",
uuid, SecureShortDebugString(cstate)));
}
@@ -925,7 +925,7 @@ void CatalogManager::PrepareForLeadershipTask() {
// Hack to block this function until InitSysCatalogAsync() is finished.
shared_lock<LockType> l(lock_);
}
- const Consensus* consensus = sys_catalog_->tablet_replica()->consensus();
+ const RaftConsensus* consensus = sys_catalog_->tablet_replica()->consensus();
const int64_t term_before_wait = consensus->ConsensusState().current_term();
{
std::lock_guard<simple_spinlock> l(state_lock_);
@@ -967,7 +967,7 @@ void CatalogManager::PrepareForLeadershipTask() {
// task. If the error is considered fatal, LOG(FATAL) is called.
const auto check = [this](
std::function<Status()> func,
- const Consensus& consensus,
+ const RaftConsensus& consensus,
int64_t start_term,
const char* op_description) {
@@ -1101,7 +1101,7 @@ bool CatalogManager::IsInitialized() const {
}
RaftPeerPB::Role CatalogManager::Role() const {
- scoped_refptr<consensus::Consensus> consensus;
+ scoped_refptr<consensus::RaftConsensus> consensus;
{
std::lock_guard<simple_spinlock> l(state_lock_);
if (state_ == kRunning) {
@@ -4164,7 +4164,7 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
const string& uuid = catalog_->master_->fs_manager()->uuid();
if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
leader_status_ = Status::IllegalState(
- Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
+ Substitute("Not the leader. Local UUID: $0, Raft Consensus state: $1",
uuid, SecureShortDebugString(cstate)));
return;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index c25aae7..bf7ae7a 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -276,7 +276,7 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const string& reason) {
CHECK_EQ(tablet_id, tablet_replica_->tablet_id());
- scoped_refptr<consensus::Consensus> consensus = tablet_replica_->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
if (!consensus) {
LOG_WITH_PREFIX(WARNING) << "Received notification of tablet state change "
<< "but tablet no longer running. Tablet ID: "
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 67c408b..61cb3bb 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -25,7 +25,6 @@
#include <utility>
#include <vector>
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
@@ -84,7 +83,6 @@ METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time
"locks.",
10000000, 2);
-using consensus::Consensus;
using consensus::ConsensusBootstrapInfo;
using consensus::ConsensusMetadata;
using consensus::ConsensusOptions;
@@ -318,7 +316,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
MonoTime now(MonoTime::Now());
MonoDelta elapsed(now - start);
if (elapsed > timeout) {
- return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
+ return Status::TimedOut(Substitute("Raft Consensus is not running after waiting for $0: $1",
elapsed.ToString(), TabletStatePB_Name(cached_state)));
}
SleepFor(MonoDelta::FromMilliseconds(1L << backoff_exp));
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 51cd387..15ec7dc 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -24,8 +24,8 @@
#include <string>
#include <vector>
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/log.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/ref_counted.h"
@@ -73,7 +73,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
Callback<void(const std::string& reason)> mark_dirty_clbk);
// Initializes the TabletReplica, namely creating the Log and initializing
- // Consensus.
+ // RaftConsensus.
Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
const scoped_refptr<server::Clock>& clock,
const std::shared_ptr<rpc::Messenger>& messenger,
@@ -122,12 +122,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
virtual Status StartReplicaTransaction(
const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
- consensus::Consensus* consensus() {
+ consensus::RaftConsensus* consensus() {
std::lock_guard<simple_spinlock> lock(lock_);
return consensus_.get();
}
- scoped_refptr<consensus::Consensus> shared_consensus() const {
+ scoped_refptr<consensus::RaftConsensus> shared_consensus() const {
std::lock_guard<simple_spinlock> lock(lock_);
return consensus_;
}
@@ -288,7 +288,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
scoped_refptr<log::Log> log_;
std::shared_ptr<Tablet> tablet_;
std::shared_ptr<rpc::Messenger> messenger_;
- scoped_refptr<consensus::Consensus> consensus_;
+ scoped_refptr<consensus::RaftConsensus> consensus_;
simple_spinlock prepare_replicate_lock_;
// Lock protecting state_, last_status_, as well as smart pointers to collaborating
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index f0696a6..72db821 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -24,11 +24,11 @@
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/util/auto_release_pool.h"
#include "kudu/util/locks.h"
-#include "kudu/util/status.h"
#include "kudu/util/memory/arena.h"
+#include "kudu/util/status.h"
namespace kudu {
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index cb6bf64..57f841a 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -19,7 +19,6 @@
#include <mutex>
-#include "kudu/consensus/consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/rpc/result_tracker.h"
@@ -36,11 +35,9 @@ namespace kudu {
namespace tablet {
using consensus::CommitMsg;
-using consensus::Consensus;
-using consensus::ConsensusRound;
-using consensus::ReplicateMsg;
-using consensus::CommitMsg;
using consensus::DriverType;
+using consensus::RaftConsensus;
+using consensus::ReplicateMsg;
using log::Log;
using rpc::RequestIdPB;
using rpc::ResultTracker;
@@ -83,7 +80,7 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba
////////////////////////////////////////////////////////////
TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
- Consensus* consensus,
+ RaftConsensus* consensus,
Log* log,
ThreadPool* prepare_pool,
ThreadPool* apply_pool,
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 044edda..e0a987c 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -21,7 +21,7 @@
#include <string>
#include <kudu/rpc/result_tracker.h>
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/walltime.h"
#include "kudu/server/clock.h"
@@ -69,7 +69,7 @@ class TransactionTracker;
// follower and ReplicationFinished() has already been called, then we can move
// on to ApplyAsync().
//
-// 4 - The Consensus implementation calls ReplicationFinished()
+// 4 - RaftConsensus calls ReplicationFinished()
//
// This is triggered by consensus when the commit index moves past our own
// OpId. On followers, this can happen before Prepare() finishes, and thus
@@ -220,7 +220,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
// Construct TransactionDriver. TransactionDriver does not take ownership
// of any of the objects pointed to in the constructor's arguments.
TransactionDriver(TransactionTracker* txn_tracker,
- consensus::Consensus* consensus,
+ consensus::RaftConsensus* consensus,
log::Log* log,
ThreadPool* prepare_pool,
ThreadPool* apply_pool,
@@ -247,7 +247,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
// at the next synchronization point.
void Abort(const Status& status);
- // Callback from Consensus when replication is complete, and thus the operation
+ // Callback from RaftConsensus when replication is complete, and thus the operation
// is considered "committed" from the consensus perspective (ie it will be
// applied on every node, and not ever truncated from the state machine history).
// If status is anything different from OK() we don't proceed with the apply.
@@ -307,7 +307,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
// Submits ApplyTask to the apply pool.
Status ApplyAsync();
- // Calls Transaction::Apply() followed by Consensus::Commit() with the
+ // Calls Transaction::Apply() followed by RaftConsensus::Commit() with the
// results from the Apply().
void ApplyTask();
@@ -343,7 +343,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
void RegisterFollowerTransactionOnResultTracker();
TransactionTracker* const txn_tracker_;
- consensus::Consensus* const consensus_;
+ consensus::RaftConsensus* const consensus_;
log::Log* const log_;
ThreadPool* const prepare_pool_;
ThreadPool* const apply_pool_;
@@ -355,7 +355,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
mutable simple_spinlock lock_;
// A copy of the transaction's OpId, set when the transaction first
- // receives one from Consensus and uninitialized until then.
+ // receives one from RaftConsensus and uninitialized until then.
// TODO(todd): we have three separate copies of this now -- in TransactionState,
// CommitMsg, and here... we should be able to consolidate!
consensus::OpId op_id_copy_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index f8ea48d..c9aa80f 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -36,10 +36,6 @@ struct DecodedRowOperation;
class ConstContiguousRow;
class RowwiseRowBlockPB;
-namespace consensus {
-class Consensus;
-}
-
namespace tserver {
class WriteRequestPB;
class WriteResponsePB;
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 2926792..85faa49 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -120,12 +120,12 @@ Status TabletCopySourceSession::Init() {
// We do this after snapshotting the log to avoid a scenario where the latest
// entry in the log has a term higher than the term stored in the consensus
// metadata, which will results in a CHECK failure on RaftConsensus init.
- scoped_refptr<consensus::Consensus> consensus = tablet_replica_->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
if (!consensus) {
tablet::TabletStatePB tablet_state = tablet_replica_->state();
return Status::IllegalState(Substitute(
"Unable to initialize tablet copy session for tablet $0. "
- "Consensus is not available. Tablet state: $1 ($2)",
+ "Raft Consensus is not available. Tablet state: $1 ($2)",
tablet_id, tablet::TabletStatePB_Name(tablet_state), tablet_state));
}
initial_cstate_ = consensus->ConsensusState();
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 82740ab..68fa88b 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,7 +28,7 @@
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/casts.h"
@@ -103,7 +103,6 @@ DECLARE_int32(tablet_history_max_age_sec);
using google::protobuf::RepeatedPtrField;
using kudu::consensus::ChangeConfigRequestPB;
using kudu::consensus::ChangeConfigResponsePB;
-using kudu::consensus::Consensus;
using kudu::consensus::ConsensusRequestPB;
using kudu::consensus::ConsensusResponsePB;
using kudu::consensus::GetLastOpIdRequestPB;
@@ -113,6 +112,7 @@ using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::UnsafeChangeConfigRequestPB;
using kudu::consensus::UnsafeChangeConfigResponsePB;
+using kudu::consensus::RaftConsensus;
using kudu::consensus::RunLeaderElectionRequestPB;
using kudu::consensus::RunLeaderElectionResponsePB;
using kudu::consensus::StartTabletCopyRequestPB;
@@ -214,10 +214,10 @@ template<class RespClass>
bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
RespClass* resp,
rpc::RpcContext* context,
- scoped_refptr<Consensus>* consensus) {
+ scoped_refptr<RaftConsensus>* consensus) {
*consensus = replica->shared_consensus();
if (!*consensus) {
- Status s = Status::ServiceUnavailable("Consensus unavailable. Tablet not running");
+ Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running");
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::TABLET_NOT_RUNNING, context);
return false;
@@ -855,8 +855,8 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
replica->permanent_uuid();
- // Submit the update directly to the TabletReplica's Consensus instance.
- scoped_refptr<Consensus> consensus;
+ // Submit the update directly to the TabletReplica's RaftConsensus instance.
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->Update(req, resp);
if (PREDICT_FALSE(!s.ok())) {
@@ -886,7 +886,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
}
// Submit the vote request directly to the consensus instance.
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->RequestVote(req, resp);
if (PREDICT_FALSE(!s.ok())) {
@@ -911,7 +911,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
return;
}
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
boost::optional<TabletServerErrorPB::Code> error_code;
Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
@@ -935,7 +935,7 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB*
return;
}
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
TabletServerErrorPB::Code error_code;
Status s = consensus->UnsafeChangeConfig(*req, &error_code);
@@ -966,11 +966,11 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
return;
}
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->StartElection(
- consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
- consensus::Consensus::EXTERNAL_REQUEST);
+ consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
+ consensus::RaftConsensus::EXTERNAL_REQUEST);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
@@ -992,7 +992,7 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
return;
}
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->StepDown(resp);
if (PREDICT_FALSE(!s.ok())) {
@@ -1022,7 +1022,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
TabletServerErrorPB::TABLET_NOT_RUNNING, context);
return;
}
- scoped_refptr<Consensus> consensus;
+ scoped_refptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
@@ -1057,7 +1057,7 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR
continue;
}
- scoped_refptr<Consensus> consensus(replica->shared_consensus());
+ scoped_refptr<RaftConsensus> consensus(replica->shared_consensus());
if (!consensus) {
continue;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index db4c51d..e4a676c 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -303,7 +303,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
"Couldn't create tablet metadata");
// We must persist the consensus metadata to disk before starting a new
- // tablet's TabletReplica and Consensus implementation.
+ // tablet's TabletReplica and RaftConsensus implementation.
unique_ptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(),
config, consensus::kMinimumTerm, &cmeta),
@@ -645,10 +645,10 @@ Status TSTabletManager::DeleteTablet(
// restarting the tablet if the local replica committed a higher config
// change op during that time, or potentially something else more invasive.
if (cas_config_opid_index_less_or_equal && !tablet_deleted) {
- scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
if (!consensus) {
*error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
- return Status::IllegalState("Consensus not available. Tablet shutting down");
+ return Status::IllegalState("Raft Consensus not available. Tablet shutting down");
}
RaftConfigPB committed_config = consensus->CommittedConfig();
if (committed_config.opid_index() > *cas_config_opid_index_less_or_equal) {
@@ -987,7 +987,7 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id,
reported_tablet->set_schema_version(replica->tablet_metadata()->schema_version());
// We cannot get consensus state information unless the TabletReplica is running.
- scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
if (consensus) {
*reported_tablet->mutable_consensus_state() = consensus->ConsensusState();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/e3b7d4dc/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 4aa9407..c4599ae 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -253,7 +253,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& re
.PartitionDebugString(replica->tablet_metadata()->partition(),
replica->tablet_metadata()->schema());
- scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
(*output) << Substitute(
// Table name, tablet id, partition
"<tr><td>$0</td><td>$1</td><td>$2</td>"
@@ -456,7 +456,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
string id;
scoped_refptr<TabletReplica> replica;
if (!LoadTablet(tserver_, req, &id, &replica, output)) return;
- scoped_refptr<consensus::Consensus> consensus = replica->shared_consensus();
+ scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
if (!consensus) {
*output << "Tablet " << EscapeForHtmlToString(id) << " not running";
return;