You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/27 02:13:02 UTC
kudu git commit: consensus: Move PendingRounds to its own file
Repository: kudu
Updated Branches:
refs/heads/master d0270172e -> 15d2fbaf4
consensus: Move PendingRounds to its own file
This simply resolves an existing TODO and does not change any existing
functionality.
Change-Id: Ibc7356fcae9835f12f06fa80edf5df200f5f958b
Reviewed-on: http://gerrit.cloudera.org:8080/7003
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/15d2fbaf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/15d2fbaf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/15d2fbaf
Branch: refs/heads/master
Commit: 15d2fbaf45eddd5cb4ea954218e0ddd041540922
Parents: d027017
Author: Mike Percy <mp...@apache.org>
Authored: Fri May 26 18:09:13 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Sat May 27 02:12:47 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/CMakeLists.txt | 1 +
src/kudu/consensus/pending_rounds.cc | 228 ++++++++++++++++++++++++
src/kudu/consensus/pending_rounds.h | 116 ++++++++++++
src/kudu/consensus/raft_consensus.h | 1 +
src/kudu/consensus/raft_consensus_state.cc | 206 +--------------------
src/kudu/consensus/raft_consensus_state.h | 102 +----------
6 files changed, 349 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index d8d3306..929fb16 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -103,6 +103,7 @@ set(CONSENSUS_SRCS
leader_election.cc
log_cache.cc
peer_manager.cc
+ pending_rounds.cc
quorum_util.cc
raft_consensus.cc
raft_consensus_state.cc
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/pending_rounds.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc
new file mode 100644
index 0000000..121cb24
--- /dev/null
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/consensus/pending_rounds.h"
+
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+
+using std::string;
+using strings::Substitute;
+
+//------------------------------------------------------------
+// PendingRounds
+//------------------------------------------------------------
+
+PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
+ : log_prefix_(std::move(log_prefix)),
+ last_committed_op_id_(MinimumOpId()),
+ time_manager_(std::move(time_manager)) {}
+
+PendingRounds::~PendingRounds() {
+}
+
+Status PendingRounds::CancelPendingTransactions() {
+ ThreadRestrictions::AssertWaitAllowed();
+ if (pending_txns_.empty()) {
+ return Status::OK();
+ }
+
+ LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size()
+ << " pending transactions.";
+ for (const auto& txn : pending_txns_) {
+ const scoped_refptr<ConsensusRound>& round = txn.second;
+ // We cancel only transactions whose applies have not yet been triggered.
+ LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: "
+ << SecureShortDebugString(*txn.second->replicate_msg());
+ round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
+ }
+ return Status::OK();
+}
+
+void PendingRounds::AbortOpsAfter(int64_t index) {
+ LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) "
+ << index;
+
+ DCHECK_GE(index, 0);
+ OpId new_preceding;
+
+ auto iter = pending_txns_.lower_bound(index);
+
+ // Either the new preceding id is in the pendings set or it must be equal to the
+ // committed index since we can't truncate already committed operations.
+ if (iter != pending_txns_.end() && (*iter).first == index) {
+ new_preceding = (*iter).second->replicate_msg()->id();
+ ++iter;
+ } else {
+ CHECK_EQ(index, last_committed_op_id_.index());
+ new_preceding = last_committed_op_id_;
+ }
+
+ for (; iter != pending_txns_.end();) {
+ const scoped_refptr<ConsensusRound>& round = (*iter).second;
+ auto op_type = round->replicate_msg()->op_type();
+ LOG_WITH_PREFIX(INFO)
+ << "Aborting uncommitted " << OperationType_Name(op_type)
+ << " operation due to leader change: " << round->replicate_msg()->id();
+
+ round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
+ // Erase the entry from pendings.
+ pending_txns_.erase(iter++);
+ }
+}
+
+Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
+ InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
+ return Status::OK();
+}
+
+scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
+ return FindPtrOrNull(pending_txns_, index);
+}
+
+bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
+
+ *term_mismatch = false;
+
+ if (op_id.index() <= GetCommittedIndex()) {
+ return true;
+ }
+
+ scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
+ if (!round) {
+ return false;
+ }
+
+ if (round->id().term() != op_id.term()) {
+ *term_mismatch = true;
+ return false;
+ }
+ return true;
+}
+
+OpId PendingRounds::GetLastPendingTransactionOpId() const {
+ return pending_txns_.empty()
+ ? MinimumOpId() : (--pending_txns_.end())->second->id();
+}
+
+Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
+ // If we already committed up to (or past) 'id' return.
+ // This can happen in the case that multiple UpdateConsensus() calls end
+ // up in the RPC queue at the same time, and then might get interleaved out
+ // of order.
+ if (last_committed_op_id_.index() >= committed_index) {
+ VLOG_WITH_PREFIX(1)
+ << "Already marked ops through " << last_committed_op_id_ << " as committed. "
+ << "Now trying to mark " << committed_index << " which would be a no-op.";
+ return Status::OK();
+ }
+
+ if (pending_txns_.empty()) {
+ LOG(ERROR) << "Advancing commit index to " << committed_index
+ << " from " << last_committed_op_id_
+ << " we have no pending txns"
+ << GetStackTrace();
+ VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: "
+ << committed_index;
+ return Status::OK();
+ }
+
+ // Start at the operation after the last committed one.
+ auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
+ // Stop at the operation after the last one we must commit.
+ auto end_iter = pending_txns_.upper_bound(committed_index);
+ CHECK(iter != pending_txns_.end());
+
+ VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
+ << last_committed_op_id_
+ << " Starting to apply from log index: " << (*iter).first;
+
+ while (iter != end_iter) {
+ scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
+ DCHECK(round);
+ const OpId& current_id = round->id();
+
+ if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
+ CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
+ }
+
+ pending_txns_.erase(iter++);
+ last_committed_op_id_ = round->id();
+ time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
+ round->NotifyReplicationFinished(Status::OK());
+ }
+
+ return Status::OK();
+}
+
+Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
+ CHECK_EQ(last_committed_op_id_.index(), 0);
+ if (!pending_txns_.empty()) {
+ int64_t first_pending_index = pending_txns_.begin()->first;
+ if (committed_op.index() < first_pending_index) {
+ if (committed_op.index() != first_pending_index - 1) {
+ return Status::Corruption(Substitute(
+ "pending operations should start at first operation "
+ "after the committed operation (committed=$0, first pending=$1)",
+ OpIdToString(committed_op), first_pending_index));
+ }
+ last_committed_op_id_ = committed_op;
+ }
+
+ RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
+ CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
+ SecureShortDebugString(committed_op));
+
+ } else {
+ last_committed_op_id_ = committed_op;
+ }
+ return Status::OK();
+}
+
+Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
+ if (current.term() < previous.term()) {
+ return Status::Corruption(Substitute("New operation's term is not >= than the previous "
+ "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
+ }
+ if (current.index() != previous.index() + 1) {
+ return Status::Corruption(Substitute("New operation's index does not follow the previous"
+ " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
+ }
+ return Status::OK();
+}
+
+int64_t PendingRounds::GetCommittedIndex() const {
+ return last_committed_op_id_.index();
+}
+
+int64_t PendingRounds::GetTermWithLastCommittedOp() const {
+ return last_committed_op_id_.term();
+}
+
+int PendingRounds::GetNumPendingTxns() const {
+ return pending_txns_.size();
+}
+
+} // namespace consensus
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/pending_rounds.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h
new file mode 100644
index 0000000..02a8686
--- /dev/null
+++ b/src/kudu/consensus/pending_rounds.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+
+class TimeManager;
+
+// Tracks the pending consensus rounds being managed by a Raft replica (either leader
+// or follower).
+//
+// This class is not thread-safe.
+//
+// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction".
+// We should consolidate to "round".
+class PendingRounds {
+ public:
+ PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
+ ~PendingRounds();
+
+ // Set the committed op during startup. This should be done after
+ // appending any of the pending transactions, and will take care
+ // of triggering any that are now considered committed.
+ Status SetInitialCommittedOpId(const OpId& committed_op);
+
+ // Returns the the ConsensusRound with the provided index, if there is any, or NULL
+ // if there isn't.
+ scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
+
+ // Add 'round' to the set of rounds waiting to be committed.
+ Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
+
+ // Advances the committed index.
+ // This is a no-op if the committed index has not changed.
+ Status AdvanceCommittedIndex(int64_t committed_index);
+
+ // Aborts pending operations after, but not including 'index'. The OpId with 'index'
+ // will become our new last received id. If there are pending operations with indexes
+ // higher than 'index' those operations are aborted.
+ void AbortOpsAfter(int64_t index);
+
+ // Returns true if an operation is in this replica's log, namely:
+ // - If the op's index is lower than or equal to our committed index
+ // - If the op id matches an inflight op.
+ // If an operation with the same index is in our log but the terms
+ // are different 'term_mismatch' is set to true, it is false otherwise.
+ bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
+
+ // Returns the id of the latest pending transaction (i.e. the one with the
+ // latest index). This must be called under the lock.
+ OpId GetLastPendingTransactionOpId() const;
+
+ // Used by replicas to cancel pending transactions. Pending transaction are those
+ // that have completed prepare/replicate but are waiting on the LEADER's commit
+ // to complete. This does not cancel transactions being applied.
+ Status CancelPendingTransactions();
+
+ // Returns the number of transactions that are currently in the pending state
+ // i.e. transactions for which Prepare() is done or under way.
+ int GetNumPendingTxns() const;
+
+ // Returns the watermark below which all operations are known to
+ // be committed according to consensus.
+ // TODO(todd): these should probably be removed in favor of using the queue.
+ int64_t GetCommittedIndex() const;
+ int64_t GetTermWithLastCommittedOp() const;
+
+ // Checks that 'current' correctly follows 'previous'. Specifically it checks
+ // that the term is the same or higher and that the index is sequential.
+ static Status CheckOpInSequence(const OpId& previous, const OpId& current);
+
+ private:
+ const std::string& LogPrefix() const { return log_prefix_; }
+
+ const std::string log_prefix_;
+
+ // Index=>Round map that manages pending ops, i.e. operations for which we've
+ // received a replicate message from the leader but have yet to be committed.
+ // The key is the index of the replicate operation.
+ typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
+ IndexToRoundMap pending_txns_;
+
+ // The OpId of the round that was last committed. Initialized to MinimumOpId().
+ OpId last_committed_op_id_;
+
+ scoped_refptr<TimeManager> time_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(PendingRounds);
+};
+
+} // namespace consensus
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 0c6fa34..b2186db 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -29,6 +29,7 @@
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/util/atomic.h"
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index e06eea7..c7c26db 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -15,22 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-#include <algorithm>
-#include <memory>
+#include "kudu/consensus/raft_consensus_state.h"
-#include "kudu/consensus/log_util.h"
#include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus_state.h"
-#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
-#include "kudu/util/trace.h"
namespace kudu {
namespace consensus {
@@ -38,7 +30,6 @@ namespace consensus {
using std::string;
using std::unique_ptr;
using strings::Substitute;
-using strings::SubstituteAndAppend;
//////////////////////////////////////////////////
// ReplicaState
@@ -364,201 +355,6 @@ string ReplicaState::ToStringUnlocked() const {
peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
}
-//------------------------------------------------------------
-// PendingRounds
-// TODO(todd): move to its own file
-//------------------------------------------------------------
-
-PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
- : log_prefix_(std::move(log_prefix)),
- last_committed_op_id_(MinimumOpId()),
- time_manager_(std::move(time_manager)) {}
-
-PendingRounds::~PendingRounds() {
-}
-
-Status PendingRounds::CancelPendingTransactions() {
- ThreadRestrictions::AssertWaitAllowed();
- if (pending_txns_.empty()) {
- return Status::OK();
- }
-
- LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size()
- << " pending transactions.";
- for (const auto& txn : pending_txns_) {
- const scoped_refptr<ConsensusRound>& round = txn.second;
- // We cancel only transactions whose applies have not yet been triggered.
- LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: "
- << SecureShortDebugString(*txn.second->replicate_msg());
- round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
- }
- return Status::OK();
-}
-
-void PendingRounds::AbortOpsAfter(int64_t index) {
- LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) "
- << index;
-
- DCHECK_GE(index, 0);
- OpId new_preceding;
-
- auto iter = pending_txns_.lower_bound(index);
-
- // Either the new preceding id is in the pendings set or it must be equal to the
- // committed index since we can't truncate already committed operations.
- if (iter != pending_txns_.end() && (*iter).first == index) {
- new_preceding = (*iter).second->replicate_msg()->id();
- ++iter;
- } else {
- CHECK_EQ(index, last_committed_op_id_.index());
- new_preceding = last_committed_op_id_;
- }
-
- for (; iter != pending_txns_.end();) {
- const scoped_refptr<ConsensusRound>& round = (*iter).second;
- auto op_type = round->replicate_msg()->op_type();
- LOG_WITH_PREFIX(INFO)
- << "Aborting uncommitted " << OperationType_Name(op_type)
- << " operation due to leader change: " << round->replicate_msg()->id();
-
- round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
- // Erase the entry from pendings.
- pending_txns_.erase(iter++);
- }
-}
-
-Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
- InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
- return Status::OK();
-}
-
-scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
- return FindPtrOrNull(pending_txns_, index);
-}
-
-bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
-
- *term_mismatch = false;
-
- if (op_id.index() <= GetCommittedIndex()) {
- return true;
- }
-
- scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
- if (!round) {
- return false;
- }
-
- if (round->id().term() != op_id.term()) {
- *term_mismatch = true;
- return false;
- }
- return true;
-}
-
-OpId PendingRounds::GetLastPendingTransactionOpId() const {
- return pending_txns_.empty()
- ? MinimumOpId() : (--pending_txns_.end())->second->id();
-}
-
-Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
- // If we already committed up to (or past) 'id' return.
- // This can happen in the case that multiple UpdateConsensus() calls end
- // up in the RPC queue at the same time, and then might get interleaved out
- // of order.
- if (last_committed_op_id_.index() >= committed_index) {
- VLOG_WITH_PREFIX(1)
- << "Already marked ops through " << last_committed_op_id_ << " as committed. "
- << "Now trying to mark " << committed_index << " which would be a no-op.";
- return Status::OK();
- }
-
- if (pending_txns_.empty()) {
- LOG(ERROR) << "Advancing commit index to " << committed_index
- << " from " << last_committed_op_id_
- << " we have no pending txns"
- << GetStackTrace();
- VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: "
- << committed_index;
- return Status::OK();
- }
-
- // Start at the operation after the last committed one.
- auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
- // Stop at the operation after the last one we must commit.
- auto end_iter = pending_txns_.upper_bound(committed_index);
- CHECK(iter != pending_txns_.end());
-
- VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
- << last_committed_op_id_
- << " Starting to apply from log index: " << (*iter).first;
-
- while (iter != end_iter) {
- scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
- DCHECK(round);
- const OpId& current_id = round->id();
-
- if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
- CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
- }
-
- pending_txns_.erase(iter++);
- last_committed_op_id_ = round->id();
- time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
- round->NotifyReplicationFinished(Status::OK());
- }
-
- return Status::OK();
-}
-
-Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
- CHECK_EQ(last_committed_op_id_.index(), 0);
- if (!pending_txns_.empty()) {
- int64_t first_pending_index = pending_txns_.begin()->first;
- if (committed_op.index() < first_pending_index) {
- if (committed_op.index() != first_pending_index - 1) {
- return Status::Corruption(Substitute(
- "pending operations should start at first operation "
- "after the committed operation (committed=$0, first pending=$1)",
- OpIdToString(committed_op), first_pending_index));
- }
- last_committed_op_id_ = committed_op;
- }
-
- RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
- CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
- SecureShortDebugString(committed_op));
-
- } else {
- last_committed_op_id_ = committed_op;
- }
- return Status::OK();
-}
-
-Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
- if (current.term() < previous.term()) {
- return Status::Corruption(Substitute("New operation's term is not >= than the previous "
- "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
- }
- if (current.index() != previous.index() + 1) {
- return Status::Corruption(Substitute("New operation's index does not follow the previous"
- " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
- }
- return Status::OK();
-}
-
-int64_t PendingRounds::GetCommittedIndex() const {
- return last_committed_op_id_.index();
-}
-
-int64_t PendingRounds::GetTermWithLastCommittedOp() const {
- return last_committed_op_id_.term();
-}
-
-int PendingRounds::GetNumPendingTxns() const {
- return pending_txns_.size();
-}
-
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index 9350ab8..ee53f25 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -14,37 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_
-#define KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_
-#include <map>
+#pragma once
+
#include <memory>
-#include <mutex>
-#include <set>
#include <string>
-#include <utility>
-#include <vector>
#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/port.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"
namespace kudu {
-
-class HostPort;
-class ThreadPool;
-
-namespace rpc {
-class Messenger;
-}
-
namespace consensus {
-class TimeManager;
// Class that coordinates access to the persistent Raft state (independently of Role).
// This has a 1-1 relationship with RaftConsensus and is essentially responsible for
@@ -244,87 +228,5 @@ class ReplicaState {
State state_;
};
-// Tracks the pending consensus rounds being managed by a Raft replica (either leader
-// or follower).
-//
-// This class is not thread-safe.
-//
-// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction".
-// We should consolidate to "round".
-class PendingRounds {
- public:
- PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
- ~PendingRounds();
-
- // Set the committed op during startup. This should be done after
- // appending any of the pending transactions, and will take care
- // of triggering any that are now considered committed.
- Status SetInitialCommittedOpId(const OpId& committed_op);
-
- // Returns the the ConsensusRound with the provided index, if there is any, or NULL
- // if there isn't.
- scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
-
- // Add 'round' to the set of rounds waiting to be committed.
- Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
-
- // Advances the committed index.
- // This is a no-op if the committed index has not changed.
- Status AdvanceCommittedIndex(int64_t committed_index);
-
- // Aborts pending operations after, but not including 'index'. The OpId with 'index'
- // will become our new last received id. If there are pending operations with indexes
- // higher than 'index' those operations are aborted.
- void AbortOpsAfter(int64_t index);
-
- // Returns true if an operation is in this replica's log, namely:
- // - If the op's index is lower than or equal to our committed index
- // - If the op id matches an inflight op.
- // If an operation with the same index is in our log but the terms
- // are different 'term_mismatch' is set to true, it is false otherwise.
- bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
-
- // Returns the id of the latest pending transaction (i.e. the one with the
- // latest index). This must be called under the lock.
- OpId GetLastPendingTransactionOpId() const;
-
- // Used by replicas to cancel pending transactions. Pending transaction are those
- // that have completed prepare/replicate but are waiting on the LEADER's commit
- // to complete. This does not cancel transactions being applied.
- Status CancelPendingTransactions();
-
- // Returns the number of transactions that are currently in the pending state
- // i.e. transactions for which Prepare() is done or under way.
- int GetNumPendingTxns() const;
-
- // Returns the watermark below which all operations are known to
- // be committed according to consensus.
- // TODO(todd): these should probably be removed in favor of using the queue.
- int64_t GetCommittedIndex() const;
- int64_t GetTermWithLastCommittedOp() const;
-
- // Checks that 'current' correctly follows 'previous'. Specifically it checks
- // that the term is the same or higher and that the index is sequential.
- static Status CheckOpInSequence(const OpId& previous, const OpId& current);
-
- private:
- const std::string& LogPrefix() const { return log_prefix_; }
-
- const std::string log_prefix_;
-
- // Index=>Round map that manages pending ops, i.e. operations for which we've
- // received a replicate message from the leader but have yet to be committed.
- // The key is the index of the replicate operation.
- typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
- IndexToRoundMap pending_txns_;
-
- // The OpId of the round that was last committed. Initialized to MinimumOpId().
- OpId last_committed_op_id_;
-
- scoped_refptr<TimeManager> time_manager_;
-};
-
} // namespace consensus
} // namespace kudu
-
-#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_ */