You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/27 02:13:02 UTC

kudu git commit: consensus: Move PendingRounds to its own file

Repository: kudu
Updated Branches:
  refs/heads/master d0270172e -> 15d2fbaf4


consensus: Move PendingRounds to its own file

This simply resolves an existing TODO and does not change any existing
functionality.

Change-Id: Ibc7356fcae9835f12f06fa80edf5df200f5f958b
Reviewed-on: http://gerrit.cloudera.org:8080/7003
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/15d2fbaf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/15d2fbaf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/15d2fbaf

Branch: refs/heads/master
Commit: 15d2fbaf45eddd5cb4ea954218e0ddd041540922
Parents: d027017
Author: Mike Percy <mp...@apache.org>
Authored: Fri May 26 18:09:13 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Sat May 27 02:12:47 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt          |   1 +
 src/kudu/consensus/pending_rounds.cc       | 228 ++++++++++++++++++++++++
 src/kudu/consensus/pending_rounds.h        | 116 ++++++++++++
 src/kudu/consensus/raft_consensus.h        |   1 +
 src/kudu/consensus/raft_consensus_state.cc | 206 +--------------------
 src/kudu/consensus/raft_consensus_state.h  | 102 +----------
 6 files changed, 349 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index d8d3306..929fb16 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -103,6 +103,7 @@ set(CONSENSUS_SRCS
   leader_election.cc
   log_cache.cc
   peer_manager.cc
+  pending_rounds.cc
   quorum_util.cc
   raft_consensus.cc
   raft_consensus_state.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/pending_rounds.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc
new file mode 100644
index 0000000..121cb24
--- /dev/null
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/consensus/pending_rounds.h"
+
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+
+using std::string;
+using strings::Substitute;
+
+//------------------------------------------------------------
+// PendingRounds
+//------------------------------------------------------------
+
+PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
+    : log_prefix_(std::move(log_prefix)),
+      last_committed_op_id_(MinimumOpId()),
+      time_manager_(std::move(time_manager)) {}
+
+PendingRounds::~PendingRounds() {
+}
+
+Status PendingRounds::CancelPendingTransactions() {
+  ThreadRestrictions::AssertWaitAllowed();
+  if (pending_txns_.empty()) {
+    return Status::OK();
+  }
+
+  LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size()
+                        << " pending transactions.";
+  for (const auto& txn : pending_txns_) {
+    const scoped_refptr<ConsensusRound>& round = txn.second;
+    // We cancel only transactions whose applies have not yet been triggered.
+    LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: "
+                                   << SecureShortDebugString(*txn.second->replicate_msg());
+    round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
+  }
+  return Status::OK();
+}
+
+void PendingRounds::AbortOpsAfter(int64_t index) {
+  LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) "
+                        << index;
+
+  DCHECK_GE(index, 0);
+  OpId new_preceding;
+
+  auto iter = pending_txns_.lower_bound(index);
+
+  // Either the new preceding id is in the pendings set or it must be equal to the
+  // committed index since we can't truncate already committed operations.
+  if (iter != pending_txns_.end() && (*iter).first == index) {
+    new_preceding = (*iter).second->replicate_msg()->id();
+    ++iter;
+  } else {
+    CHECK_EQ(index, last_committed_op_id_.index());
+    new_preceding = last_committed_op_id_;
+  }
+
+  for (; iter != pending_txns_.end();) {
+    const scoped_refptr<ConsensusRound>& round = (*iter).second;
+    auto op_type = round->replicate_msg()->op_type();
+    LOG_WITH_PREFIX(INFO)
+        << "Aborting uncommitted " << OperationType_Name(op_type)
+        << " operation due to leader change: " << round->replicate_msg()->id();
+
+    round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
+    // Erase the entry from pendings.
+    pending_txns_.erase(iter++);
+  }
+}
+
+Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
+  InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
+  return Status::OK();
+}
+
+scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
+  return FindPtrOrNull(pending_txns_, index);
+}
+
+bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
+
+  *term_mismatch = false;
+
+  if (op_id.index() <= GetCommittedIndex()) {
+    return true;
+  }
+
+  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
+  if (!round) {
+    return false;
+  }
+
+  if (round->id().term() != op_id.term()) {
+    *term_mismatch = true;
+    return false;
+  }
+  return true;
+}
+
+OpId PendingRounds::GetLastPendingTransactionOpId() const {
+  return pending_txns_.empty()
+      ? MinimumOpId() : (--pending_txns_.end())->second->id();
+}
+
+Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
+  // If we already committed up to (or past) 'id' return.
+  // This can happen in the case that multiple UpdateConsensus() calls end
+  // up in the RPC queue at the same time, and then might get interleaved out
+  // of order.
+  if (last_committed_op_id_.index() >= committed_index) {
+    VLOG_WITH_PREFIX(1)
+      << "Already marked ops through " << last_committed_op_id_ << " as committed. "
+      << "Now trying to mark " << committed_index << " which would be a no-op.";
+    return Status::OK();
+  }
+
+  if (pending_txns_.empty()) {
+    LOG(ERROR) << "Advancing commit index to " << committed_index
+               << " from " << last_committed_op_id_
+               << " we have no pending txns"
+               << GetStackTrace();
+    VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: "
+                        << committed_index;
+    return Status::OK();
+  }
+
+  // Start at the operation after the last committed one.
+  auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
+  // Stop at the operation after the last one we must commit.
+  auto end_iter = pending_txns_.upper_bound(committed_index);
+  CHECK(iter != pending_txns_.end());
+
+  VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
+      <<  last_committed_op_id_
+      << " Starting to apply from log index: " << (*iter).first;
+
+  while (iter != end_iter) {
+    scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
+    DCHECK(round);
+    const OpId& current_id = round->id();
+
+    if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
+      CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
+    }
+
+    pending_txns_.erase(iter++);
+    last_committed_op_id_ = round->id();
+    time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
+    round->NotifyReplicationFinished(Status::OK());
+  }
+
+  return Status::OK();
+}
+
+Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
+  CHECK_EQ(last_committed_op_id_.index(), 0);
+  if (!pending_txns_.empty()) {
+    int64_t first_pending_index = pending_txns_.begin()->first;
+    if (committed_op.index() < first_pending_index) {
+      if (committed_op.index() != first_pending_index - 1) {
+        return Status::Corruption(Substitute(
+            "pending operations should start at first operation "
+            "after the committed operation (committed=$0, first pending=$1)",
+            OpIdToString(committed_op), first_pending_index));
+      }
+      last_committed_op_id_ = committed_op;
+    }
+
+    RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
+    CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
+             SecureShortDebugString(committed_op));
+
+  } else {
+    last_committed_op_id_ = committed_op;
+  }
+  return Status::OK();
+}
+
+Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
+  if (current.term() < previous.term()) {
+    return Status::Corruption(Substitute("New operation's term is not >= than the previous "
+        "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
+  }
+  if (current.index() != previous.index() + 1) {
+    return Status::Corruption(Substitute("New operation's index does not follow the previous"
+        " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
+  }
+  return Status::OK();
+}
+
+int64_t PendingRounds::GetCommittedIndex() const {
+  return last_committed_op_id_.index();
+}
+
+int64_t PendingRounds::GetTermWithLastCommittedOp() const {
+  return last_committed_op_id_.term();
+}
+
+int PendingRounds::GetNumPendingTxns() const {
+  return pending_txns_.size();
+}
+
+}  // namespace consensus
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/pending_rounds.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h
new file mode 100644
index 0000000..02a8686
--- /dev/null
+++ b/src/kudu/consensus/pending_rounds.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+
+class TimeManager;
+
+// Tracks the pending consensus rounds being managed by a Raft replica (either leader
+// or follower).
+//
+// This class is not thread-safe.
+//
+// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction".
+// We should consolidate to "round".
+class PendingRounds {
+ public:
+  PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
+  ~PendingRounds();
+
+  // Set the committed op during startup. This should be done after
+  // appending any of the pending transactions, and will take care
+  // of triggering any that are now considered committed.
+  Status SetInitialCommittedOpId(const OpId& committed_op);
+
+  // Returns the the ConsensusRound with the provided index, if there is any, or NULL
+  // if there isn't.
+  scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
+
+  // Add 'round' to the set of rounds waiting to be committed.
+  Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
+
+  // Advances the committed index.
+  // This is a no-op if the committed index has not changed.
+  Status AdvanceCommittedIndex(int64_t committed_index);
+
+  // Aborts pending operations after, but not including 'index'. The OpId with 'index'
+  // will become our new last received id. If there are pending operations with indexes
+  // higher than 'index' those operations are aborted.
+  void AbortOpsAfter(int64_t index);
+
+  // Returns true if an operation is in this replica's log, namely:
+  // - If the op's index is lower than or equal to our committed index
+  // - If the op id matches an inflight op.
+  // If an operation with the same index is in our log but the terms
+  // are different 'term_mismatch' is set to true, it is false otherwise.
+  bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
+
+  // Returns the id of the latest pending transaction (i.e. the one with the
+  // latest index). This must be called under the lock.
+  OpId GetLastPendingTransactionOpId() const;
+
+  // Used by replicas to cancel pending transactions. Pending transaction are those
+  // that have completed prepare/replicate but are waiting on the LEADER's commit
+  // to complete. This does not cancel transactions being applied.
+  Status CancelPendingTransactions();
+
+  // Returns the number of transactions that are currently in the pending state
+  // i.e. transactions for which Prepare() is done or under way.
+  int GetNumPendingTxns() const;
+
+  // Returns the watermark below which all operations are known to
+  // be committed according to consensus.
+  // TODO(todd): these should probably be removed in favor of using the queue.
+  int64_t GetCommittedIndex() const;
+  int64_t GetTermWithLastCommittedOp() const;
+
+  // Checks that 'current' correctly follows 'previous'. Specifically it checks
+  // that the term is the same or higher and that the index is sequential.
+  static Status CheckOpInSequence(const OpId& previous, const OpId& current);
+
+ private:
+  const std::string& LogPrefix() const { return log_prefix_; }
+
+  const std::string log_prefix_;
+
+  // Index=>Round map that manages pending ops, i.e. operations for which we've
+  // received a replicate message from the leader but have yet to be committed.
+  // The key is the index of the replicate operation.
+  typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
+  IndexToRoundMap pending_txns_;
+
+  // The OpId of the round that was last committed. Initialized to MinimumOpId().
+  OpId last_committed_op_id_;
+
+  scoped_refptr<TimeManager> time_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(PendingRounds);
+};
+
+}  // namespace consensus
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 0c6fa34..b2186db 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -29,6 +29,7 @@
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/pending_rounds.h"
 #include "kudu/consensus/raft_consensus_state.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/util/atomic.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index e06eea7..c7c26db 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -15,22 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
-#include <memory>
+#include "kudu/consensus/raft_consensus_state.h"
 
-#include "kudu/consensus/log_util.h"
 #include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus_state.h"
-#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
-#include "kudu/util/trace.h"
 
 namespace kudu {
 namespace consensus {
@@ -38,7 +30,6 @@ namespace consensus {
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
-using strings::SubstituteAndAppend;
 
 //////////////////////////////////////////////////
 // ReplicaState
@@ -364,201 +355,6 @@ string ReplicaState::ToStringUnlocked() const {
                     peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
 }
 
-//------------------------------------------------------------
-// PendingRounds
-// TODO(todd): move to its own file
-//------------------------------------------------------------
-
-PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
-    : log_prefix_(std::move(log_prefix)),
-      last_committed_op_id_(MinimumOpId()),
-      time_manager_(std::move(time_manager)) {}
-
-PendingRounds::~PendingRounds() {
-}
-
-Status PendingRounds::CancelPendingTransactions() {
-  ThreadRestrictions::AssertWaitAllowed();
-  if (pending_txns_.empty()) {
-    return Status::OK();
-  }
-
-  LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_txns_.size()
-                                 << " pending transactions.";
-  for (const auto& txn : pending_txns_) {
-    const scoped_refptr<ConsensusRound>& round = txn.second;
-    // We cancel only transactions whose applies have not yet been triggered.
-    LOG_WITH_PREFIX(INFO) << "Aborting transaction as it isn't in flight: "
-                                   << SecureShortDebugString(*txn.second->replicate_msg());
-    round->NotifyReplicationFinished(Status::Aborted("Transaction aborted"));
-  }
-  return Status::OK();
-}
-
-void PendingRounds::AbortOpsAfter(int64_t index) {
-  LOG_WITH_PREFIX(INFO) << "Aborting all transactions after (but not including) "
-                                 << index;
-
-  DCHECK_GE(index, 0);
-  OpId new_preceding;
-
-  auto iter = pending_txns_.lower_bound(index);
-
-  // Either the new preceding id is in the pendings set or it must be equal to the
-  // committed index since we can't truncate already committed operations.
-  if (iter != pending_txns_.end() && (*iter).first == index) {
-    new_preceding = (*iter).second->replicate_msg()->id();
-    ++iter;
-  } else {
-    CHECK_EQ(index, last_committed_op_id_.index());
-    new_preceding = last_committed_op_id_;
-  }
-
-  for (; iter != pending_txns_.end();) {
-    const scoped_refptr<ConsensusRound>& round = (*iter).second;
-    auto op_type = round->replicate_msg()->op_type();
-    LOG_WITH_PREFIX(INFO)
-        << "Aborting uncommitted " << OperationType_Name(op_type)
-        << " operation due to leader change: " << round->replicate_msg()->id();
-
-    round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
-    // Erase the entry from pendings.
-    pending_txns_.erase(iter++);
-  }
-}
-
-Status PendingRounds::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
-  InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
-  return Status::OK();
-}
-
-scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t index) {
-  return FindPtrOrNull(pending_txns_, index);
-}
-
-bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
-
-  *term_mismatch = false;
-
-  if (op_id.index() <= GetCommittedIndex()) {
-    return true;
-  }
-
-  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNull(op_id.index());
-  if (!round) {
-    return false;
-  }
-
-  if (round->id().term() != op_id.term()) {
-    *term_mismatch = true;
-    return false;
-  }
-  return true;
-}
-
-OpId PendingRounds::GetLastPendingTransactionOpId() const {
-  return pending_txns_.empty()
-      ? MinimumOpId() : (--pending_txns_.end())->second->id();
-}
-
-Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
-  // If we already committed up to (or past) 'id' return.
-  // This can happen in the case that multiple UpdateConsensus() calls end
-  // up in the RPC queue at the same time, and then might get interleaved out
-  // of order.
-  if (last_committed_op_id_.index() >= committed_index) {
-    VLOG_WITH_PREFIX(1)
-      << "Already marked ops through " << last_committed_op_id_ << " as committed. "
-      << "Now trying to mark " << committed_index << " which would be a no-op.";
-    return Status::OK();
-  }
-
-  if (pending_txns_.empty()) {
-    LOG(ERROR) << "Advancing commit index to " << committed_index
-               << " from " << last_committed_op_id_
-               << " we have no pending txns"
-               << GetStackTrace();
-    VLOG_WITH_PREFIX(1) << "No transactions to mark as committed up to: "
-                                 << committed_index;
-    return Status::OK();
-  }
-
-  // Start at the operation after the last committed one.
-  auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
-  // Stop at the operation after the last one we must commit.
-  auto end_iter = pending_txns_.upper_bound(committed_index);
-  CHECK(iter != pending_txns_.end());
-
-  VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
-      <<  last_committed_op_id_
-      << " Starting to apply from log index: " << (*iter).first;
-
-  while (iter != end_iter) {
-    scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
-    DCHECK(round);
-    const OpId& current_id = round->id();
-
-    if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
-      CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
-    }
-
-    pending_txns_.erase(iter++);
-    last_committed_op_id_ = round->id();
-    time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
-    round->NotifyReplicationFinished(Status::OK());
-  }
-
-  return Status::OK();
-}
-
-Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) {
-  CHECK_EQ(last_committed_op_id_.index(), 0);
-  if (!pending_txns_.empty()) {
-    int64_t first_pending_index = pending_txns_.begin()->first;
-    if (committed_op.index() < first_pending_index) {
-      if (committed_op.index() != first_pending_index - 1) {
-        return Status::Corruption(Substitute(
-            "pending operations should start at first operation "
-            "after the committed operation (committed=$0, first pending=$1)",
-            OpIdToString(committed_op), first_pending_index));
-      }
-      last_committed_op_id_ = committed_op;
-    }
-
-    RETURN_NOT_OK(AdvanceCommittedIndex(committed_op.index()));
-    CHECK_EQ(SecureShortDebugString(last_committed_op_id_),
-             SecureShortDebugString(committed_op));
-
-  } else {
-    last_committed_op_id_ = committed_op;
-  }
-  return Status::OK();
-}
-
-Status PendingRounds::CheckOpInSequence(const OpId& previous, const OpId& current) {
-  if (current.term() < previous.term()) {
-    return Status::Corruption(Substitute("New operation's term is not >= than the previous "
-        "op's term. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
-  }
-  if (current.index() != previous.index() + 1) {
-    return Status::Corruption(Substitute("New operation's index does not follow the previous"
-        " op's index. Current: $0. Previous: $1", OpIdToString(current), OpIdToString(previous)));
-  }
-  return Status::OK();
-}
-
-int64_t PendingRounds::GetCommittedIndex() const {
-  return last_committed_op_id_.index();
-}
-
-int64_t PendingRounds::GetTermWithLastCommittedOp() const {
-  return last_committed_op_id_.term();
-}
-
-int PendingRounds::GetNumPendingTxns() const {
-  return pending_txns_.size();
-}
-
 }  // namespace consensus
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/15d2fbaf/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index 9350ab8..ee53f25 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -14,37 +14,21 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_
-#define KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_
 
-#include <map>
+#pragma once
+
 #include <memory>
-#include <mutex>
-#include <set>
 #include <string>
-#include <utility>
-#include <vector>
 
 #include "kudu/consensus/consensus.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/port.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-
-class HostPort;
-class ThreadPool;
-
-namespace rpc {
-class Messenger;
-}
-
 namespace consensus {
-class TimeManager;
 
 // Class that coordinates access to the persistent Raft state (independently of Role).
 // This has a 1-1 relationship with RaftConsensus and is essentially responsible for
@@ -244,87 +228,5 @@ class ReplicaState {
   State state_;
 };
 
-// Tracks the pending consensus rounds being managed by a Raft replica (either leader
-// or follower).
-//
-// This class is not thread-safe.
-//
-// TODO(todd): this class inconsistently uses the term "round", "op", and "transaction".
-// We should consolidate to "round".
-class PendingRounds {
- public:
-  PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
-  ~PendingRounds();
-
-  // Set the committed op during startup. This should be done after
-  // appending any of the pending transactions, and will take care
-  // of triggering any that are now considered committed.
-  Status SetInitialCommittedOpId(const OpId& committed_op);
-
-  // Returns the the ConsensusRound with the provided index, if there is any, or NULL
-  // if there isn't.
-  scoped_refptr<ConsensusRound> GetPendingOpByIndexOrNull(int64_t index);
-
-  // Add 'round' to the set of rounds waiting to be committed.
-  Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
-
-  // Advances the committed index.
-  // This is a no-op if the committed index has not changed.
-  Status AdvanceCommittedIndex(int64_t committed_index);
-
-  // Aborts pending operations after, but not including 'index'. The OpId with 'index'
-  // will become our new last received id. If there are pending operations with indexes
-  // higher than 'index' those operations are aborted.
-  void AbortOpsAfter(int64_t index);
-
-  // Returns true if an operation is in this replica's log, namely:
-  // - If the op's index is lower than or equal to our committed index
-  // - If the op id matches an inflight op.
-  // If an operation with the same index is in our log but the terms
-  // are different 'term_mismatch' is set to true, it is false otherwise.
-  bool IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch);
-
-  // Returns the id of the latest pending transaction (i.e. the one with the
-  // latest index). This must be called under the lock.
-  OpId GetLastPendingTransactionOpId() const;
-
-  // Used by replicas to cancel pending transactions. Pending transaction are those
-  // that have completed prepare/replicate but are waiting on the LEADER's commit
-  // to complete. This does not cancel transactions being applied.
-  Status CancelPendingTransactions();
-
-  // Returns the number of transactions that are currently in the pending state
-  // i.e. transactions for which Prepare() is done or under way.
-  int GetNumPendingTxns() const;
-
-  // Returns the watermark below which all operations are known to
-  // be committed according to consensus.
-  // TODO(todd): these should probably be removed in favor of using the queue.
-  int64_t GetCommittedIndex() const;
-  int64_t GetTermWithLastCommittedOp() const;
-
-  // Checks that 'current' correctly follows 'previous'. Specifically it checks
-  // that the term is the same or higher and that the index is sequential.
-  static Status CheckOpInSequence(const OpId& previous, const OpId& current);
-
- private:
-  const std::string& LogPrefix() const { return log_prefix_; }
-
-  const std::string log_prefix_;
-
-  // Index=>Round map that manages pending ops, i.e. operations for which we've
-  // received a replicate message from the leader but have yet to be committed.
-  // The key is the index of the replicate operation.
-  typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
-  IndexToRoundMap pending_txns_;
-
-  // The OpId of the round that was last committed. Initialized to MinimumOpId().
-  OpId last_committed_op_id_;
-
-  scoped_refptr<TimeManager> time_manager_;
-};
-
 }  // namespace consensus
 }  // namespace kudu
-
-#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_UTIL_H_ */