You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/19 22:16:07 UTC

[4/7] kudu git commit: consensus: move more logic from ReplicaState to RaftConsensus

consensus: move more logic from ReplicaState to RaftConsensus

ReplicaState currently has two main responsibilities:
* manage the set of pending transactions, including committing
  and aborting them. The main state here is the pending_txns_
  map.
* act as a wrapper to ConsensusMeta (stored as cmeta_).

These two items are relatively distinct. I drew a graph of the
dependencies between methods and members of ReplicaState[1] and it
became clear that most methods either pertained to one or the
other of these responsibilities, but not both. So, this patch seeks to
try to clean up the methods which were straddling the two
responsibilities as follows:

1. AbortOpsAfterUnlocked, AddPendingOperation, AdvanceCommittedIndexUnlocked

  These three are mainly focused around managing pending transactions,
  but were also taking care of setting the pending and committed Raft
  configuration in the case of configuration change operations.

  The patch moves the handling of configuration changes into
  RaftConsensus itself by using the existing "replicated callback" to
  perform the necessary work when the operation finishes, and by
  checking for config changes before adding pending operations.

4. CheckHasCommittedOpInCurrentTermUnlocked

  This one is used to check whether the last committed operation is in
  the current term. Recently the PeerMessageQueue became responsible for
  tracking the committed OpId, and already has the necessary state to
  provide the same functionality.

After the refactor, the class clearly shows two separate groups of
methods and members[2]. This will make it easier to maintain/understand
the code as well as open more opportunities for finer-grained locking by
having smaller classes with better-defined responsibilities.

I looped raft_consensus-itest 100 times with this patch and passed
100%[3].

NOTE: for the sake of the dependency graphs, I removed the state_
variable, ToString methods, and locking methods and left only the
"important" bits.

[1] http://i.imgur.com/fpyWwyM.png
[2] http://i.imgur.com/7K9Dc5J.png
[3] http://dist-test.cloudera.org/job?job_id=todd.1476324853.3445

Change-Id: I0f377ebb132f3f58f984605197831f41198d78c5
Reviewed-on: http://gerrit.cloudera.org:8080/4709
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f2776c89
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f2776c89
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f2776c89

Branch: refs/heads/master
Commit: f2776c892de9631dfe4ee62d1321889751572b21
Parents: 46d4d78
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Oct 12 18:51:32 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Oct 19 22:07:45 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc           |   6 ++
 src/kudu/consensus/consensus_queue.h            |   7 +-
 src/kudu/consensus/raft_consensus.cc            | 104 +++++++++++++++++--
 src/kudu/consensus/raft_consensus.h             |  17 +++
 src/kudu/consensus/raft_consensus_state.cc      |  90 +---------------
 src/kudu/consensus/raft_consensus_state.h       |   3 -
 .../integration-tests/raft_consensus-itest.cc   |   2 +-
 7 files changed, 129 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 6598662..1d06adb 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -789,6 +789,12 @@ int64_t PeerMessageQueue::GetCommittedIndex() const {
   return queue_state_.committed_index;
 }
 
+bool PeerMessageQueue::IsCommittedIndexInCurrentTerm() const {
+  std::lock_guard<simple_spinlock> lock(queue_lock_);
+  return queue_state_.first_index_in_current_term != boost::none &&
+      queue_state_.committed_index >= *queue_state_.first_index_in_current_term;
+}
+
 int64_t PeerMessageQueue::GetMajorityReplicatedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
   return queue_state_.majority_replicated_index;

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index c5fc98c..9b8e8ea 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -62,9 +62,11 @@ extern const char kConsensusQueueParentTrackerId[];
 //
 // This class is used only on the LEADER side.
 //
-// TODO Right now this class is able to track one outstanding operation
+// TODO(todd): Right now this class is able to track one outstanding operation
 // per peer. If we want to have more than one outstanding RPC we need to
 // modify it.
+//
+// TODO(todd): make methods non-virtual since they aren't overridden.
 class PeerMessageQueue {
  public:
   struct TrackedPeer {
@@ -251,6 +253,9 @@ class PeerMessageQueue {
   // this index have been committed.
   virtual int64_t GetCommittedIndex() const;
 
+  // Return true if the committed index falls within the current term.
+  bool IsCommittedIndexInCurrentTerm() const;
+
   // Returns the current majority replicated index, for tests.
   virtual int64_t GetMajorityReplicatedIndexForTests() const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index ac12ec7..6e4b708 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -590,7 +590,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
 
 Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
   *round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
-  RETURN_NOT_OK(state_->AddPendingOperation(round));
+  RETURN_NOT_OK(AddPendingOperationUnlocked(round));
 
   Status s = queue_->AppendOperation(round->replicate_scoped_refptr());
 
@@ -611,6 +611,45 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
   return Status::OK();
 }
 
+Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
+  // If we are adding a pending config change, we need to propagate it to the
+  // metadata.
+  if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
+    DCHECK(round->replicate_msg()->change_config_record().has_old_config());
+    DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index());
+    DCHECK(round->replicate_msg()->change_config_record().has_new_config());
+    DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index());
+    const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
+    const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
+    if (state_->GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
+      // The leader has to mark the configuration as pending before it gets here
+      // because the active configuration affects the replication queue.
+      // Do one last sanity check.
+      Status s = state_->CheckNoConfigChangePendingUnlocked();
+      if (PREDICT_FALSE(!s.ok())) {
+        s = s.CloneAndAppend(Substitute("\n  New config: $0", new_config.ShortDebugString()));
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
+        return s;
+      }
+      // Check if the pending Raft config has an OpId less than the committed
+      // config. If so, this is a replay at startup in which the COMMIT
+      // messages were delayed.
+      const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+      if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
+        CHECK_OK(state_->SetPendingConfigUnlocked(new_config));
+      } else {
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring setting pending config change with OpId "
+            << round->replicate_msg()->id() << " because the committed config has OpId index "
+            << committed_config.opid_index() << ". The config change we are ignoring is: "
+            << "Old config: { " << old_config.ShortDebugString() << " }. "
+            << "New config: { " << new_config.ShortDebugString() << " }";
+      }
+    }
+  }
+
+  return state_->AddPendingOperation(round);
+}
+
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   ReplicaState::UniqueLock lock;
   Status s = state_->LockForCommit(&lock);
@@ -748,7 +787,7 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
   ConsensusRound* round_ptr = round.get();
   RETURN_NOT_OK(state_->GetReplicaTransactionFactoryUnlocked()->
       StartReplicaTransaction(round));
-  return state_->AddPendingOperation(round_ptr);
+  return AddPendingOperationUnlocked(round_ptr);
 }
 
 Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
@@ -1434,10 +1473,14 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
     RETURN_NOT_OK(state_->LockForConfigChange(&lock));
     RETURN_NOT_OK(state_->CheckActiveLeaderUnlocked());
     RETURN_NOT_OK(state_->CheckNoConfigChangePendingUnlocked());
+
     // We are required by Raft to reject config change operations until we have
     // committed at least one operation in our current term as leader.
     // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
-    RETURN_NOT_OK(state_->CheckHasCommittedOpInCurrentTermUnlocked());
+    if (!queue_->IsCommittedIndexInCurrentTerm()) {
+      return Status::IllegalState("Leader has not yet committed an operation in its own term");
+    }
+
     if (!server.has_permanent_uuid()) {
       return Status::InvalidArgument("server must have permanent_uuid specified",
                                      req.ShortDebugString());
@@ -1569,7 +1612,7 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
                                                   Unretained(this),
                                                   string("Replicated consensus-only round"),
                                                   Bind(&DoNothingStatusCB))));
-  return state_->AddPendingOperation(round);
+  return AddPendingOperationUnlocked(round);
 }
 
 Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
@@ -1727,6 +1770,7 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
   MarkDirty("New leader " + uuid);
 }
 
+
 Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
                                                     const RaftConfigPB& new_config,
                                                     const StatusCallback& client_cb) {
@@ -1986,12 +2030,18 @@ void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
 void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
                                                   const StatusCallback& client_cb,
                                                   const Status& status) {
+  // NOTE: the ReplicaState lock is held here because this is triggered by
+  // ReplicaState's abort or commit paths.
   OperationType op_type = round->replicate_msg()->op_type();
   const string& op_type_str = OperationType_Name(op_type);
   CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str;
+
+  if (op_type == CHANGE_CONFIG_OP) {
+    CompleteConfigChangeRoundUnlocked(round, status);
+    // Fall through to the generic handling.
+  }
+
   if (!status.ok()) {
-    // In the case that a change-config operation is aborted, RaftConsensusState
-    // already handled clearing the pending config state.
     LOG(INFO) << state_->LogPrefixThreadSafe() << op_type_str << " replication failed: "
               << status.ToString();
     client_cb.Run(status);
@@ -2010,6 +2060,48 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   client_cb.Run(status);
 }
 
+void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
+  if (!status.ok()) {
+    // Abort a failed config change.
+    CHECK(state_->IsConfigChangePendingUnlocked())
+        << LogPrefixUnlocked() << "Aborting CHANGE_CONFIG_OP but "
+        << "there was no pending config set. Op: "
+        << round->replicate_msg()->ShortDebugString();
+    state_->ClearPendingConfigUnlocked();
+    return;
+  }
+
+  // Commit the successful config change.
+  const OpId& op_id = round->replicate_msg()->id();
+
+  DCHECK(round->replicate_msg()->change_config_record().has_old_config());
+  DCHECK(round->replicate_msg()->change_config_record().has_new_config());
+  RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
+  RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
+  DCHECK(old_config.has_opid_index());
+  DCHECK(!new_config.has_opid_index());
+  new_config.set_opid_index(op_id.index());
+  // Check if the pending Raft config has an OpId less than the committed
+  // config. If so, this is a replay at startup in which the COMMIT
+  // messages were delayed.
+  const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+  if (new_config.opid_index() > committed_config.opid_index()) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Committing config change with OpId "
+                                   << op_id << ": "
+                                   << DiffRaftConfigs(old_config, new_config)
+                                   << ". New config: { " << new_config.ShortDebugString() << " }";
+    CHECK_OK(state_->SetCommittedConfigUnlocked(new_config));
+  } else {
+    LOG_WITH_PREFIX_UNLOCKED(INFO)
+        << "Ignoring commit of config change with OpId "
+        << op_id << " because the committed config has OpId index "
+        << committed_config.opid_index() << ". The config change we are ignoring is: "
+        << "Old config: { " << old_config.ShortDebugString() << " }. "
+        << "New config: { " << new_config.ShortDebugString() << " }";
+  }
+}
+
+
 Status RaftConsensus::EnsureFailureDetectorEnabledUnlocked() {
   if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
     return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 37f7f83..866b2a9 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -401,12 +401,23 @@ class RaftConsensus : public Consensus,
                              const RaftConfigPB& committed_config,
                              const std::string& reason);
 
+
+  // Handle the completion of replication of a config change operation.
+  // If 'status' is OK, this takes care of persisting the new configuration
+  // to disk as the committed configuration. A non-OK status indicates that
+  // the replication failed, in which case the pending configuration needs
+  // to be cleared such that we revert back to the old configuration.
+  void CompleteConfigChangeRoundUnlocked(ConsensusRound* round,
+                                         const Status& status);
+
   // Trigger that a non-Transaction ConsensusRound has finished replication.
   // If the replication was successful, an status will be OK. Otherwise, it
   // may be Aborted or some other error status.
   // If 'status' is OK, write a Commit message to the local WAL based on the
   // type of message it is.
   // The 'client_cb' will be invoked at the end of this execution.
+  //
+  // NOTE: this must be called with the ReplicaState lock held.
   void NonTxRoundReplicationFinished(ConsensusRound* round,
                                      const StatusCallback& client_cb,
                                      const Status& status);
@@ -419,6 +430,12 @@ class RaftConsensus : public Consensus,
   // Only virtual and protected for mocking purposes.
   Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg);
 
+  // Add a new pending operation to the ReplicaState, including the special handling
+  // necessary if this round contains a configuration change. These rounds must
+  // take effect as soon as they are received, rather than waiting for commitment
+  // (see Diego Ongaro's thesis section 4.1).
+  Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
+
   // Threadpool for constructing requests to peers, handling RPC callbacks,
   // etc.
   gscoped_ptr<ThreadPool> thread_pool_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index f99fb28..366af51 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -403,16 +403,6 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
         << "Aborting uncommitted " << OperationType_Name(op_type)
         << " operation due to leader change: " << round->replicate_msg()->id();
 
-    // When aborting a config-change operation, go back to using the committed
-    // configuration.
-    if (PREDICT_FALSE(op_type == CHANGE_CONFIG_OP)) {
-      CHECK(IsConfigChangePendingUnlocked())
-          << LogPrefixUnlocked() << "Aborting CHANGE_CONFIG_OP but "
-          << "there was no pending config set. Op: "
-          << round->replicate_msg()->ShortDebugString();
-      ClearPendingConfigUnlocked();
-    }
-
     round->NotifyReplicationFinished(Status::Aborted("Transaction aborted by new leader"));
     // Erase the entry from pendings.
     pending_txns_.erase(iter++);
@@ -422,46 +412,7 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
 Status ReplicaState::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {
   DCHECK(update_lock_.is_locked());
   if (PREDICT_FALSE(state_ != kRunning)) {
-    // Special case when we're configuring and this is a config change, refuse
-    // everything else.
-    // TODO: Don't require a NO_OP to get to kRunning state
-    if (round->replicate_msg()->op_type() != NO_OP) {
-      return Status::IllegalState("Cannot trigger prepare. Replica is not in kRunning state.");
-    }
-  }
-
-  // Mark pending configuration.
-  if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
-    DCHECK(round->replicate_msg()->change_config_record().has_old_config());
-    DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index());
-    DCHECK(round->replicate_msg()->change_config_record().has_new_config());
-    DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index());
-    const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
-    const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
-    if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
-      // The leader has to mark the configuration as pending before it gets here
-      // because the active configuration affects the replication queue.
-      // Do one last sanity check.
-      Status s = CheckNoConfigChangePendingUnlocked();
-      if (PREDICT_FALSE(!s.ok())) {
-        s = s.CloneAndAppend(Substitute("\n  New config: $0", new_config.ShortDebugString()));
-        LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
-        return s;
-      }
-      // Check if the pending Raft config has an OpId less than the committed
-      // config. If so, this is a replay at startup in which the COMMIT
-      // messages were delayed.
-      const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
-      if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
-        CHECK_OK(SetPendingConfigUnlocked(new_config));
-      } else {
-        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring setting pending config change with OpId "
-            << round->replicate_msg()->id() << " because the committed config has OpId index "
-            << committed_config.opid_index() << ". The config change we are ignoring is: "
-            << "Old config: { " << old_config.ShortDebugString() << " }. "
-            << "New config: { " << new_config.ShortDebugString() << " }";
-      }
-    }
+    return Status::IllegalState("Cannot trigger prepare. Replica is not in kRunning state.");
   }
 
   InsertOrDie(&pending_txns_, round->replicate_msg()->id().index(), round);
@@ -515,35 +466,6 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index) {
     }
 
     pending_txns_.erase(iter++);
-
-    // Set committed configuration.
-    if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
-      DCHECK(round->replicate_msg()->change_config_record().has_old_config());
-      DCHECK(round->replicate_msg()->change_config_record().has_new_config());
-      RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
-      RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
-      DCHECK(old_config.has_opid_index());
-      DCHECK(!new_config.has_opid_index());
-      new_config.set_opid_index(current_id.index());
-      // Check if the pending Raft config has an OpId less than the committed
-      // config. If so, this is a replay at startup in which the COMMIT
-      // messages were delayed.
-      const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
-      if (new_config.opid_index() > committed_config.opid_index()) {
-        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Committing config change with OpId "
-            << current_id << ": "
-            << DiffRaftConfigs(old_config, new_config)
-            << ". New config: { " << new_config.ShortDebugString() << " }";
-        CHECK_OK(SetCommittedConfigUnlocked(new_config));
-      } else {
-        LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring commit of config change with OpId "
-            << current_id << " because the committed config has OpId index "
-            << committed_config.opid_index() << ". The config change we are ignoring is: "
-            << "Old config: { " << old_config.ShortDebugString() << " }. "
-            << "New config: { " << new_config.ShortDebugString() << " }";
-      }
-    }
-
     last_committed_op_id_ = round->id();
     round->NotifyReplicationFinished(Status::OK());
   }
@@ -586,16 +508,6 @@ int64_t ReplicaState::GetTermWithLastCommittedOpUnlocked() const {
   return last_committed_op_id_.term();
 }
 
-
-Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
-  int64_t term = GetCurrentTermUnlocked();
-  const OpId& opid = last_committed_op_id_;
-  if (opid.term() != term) {
-    return Status::IllegalState("Latest committed op is not from this term", OpIdToString(opid));
-  }
-  return Status::OK();
-}
-
 OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const {
   DCHECK(update_lock_.is_locked());
   return pending_txns_.empty()

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index d5d6fe2..ed53deb 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -266,9 +266,6 @@ class ReplicaState {
   int64_t GetCommittedIndexUnlocked() const;
   int64_t GetTermWithLastCommittedOpUnlocked() const;
 
-  // Returns OK iff an op from the current term has been committed.
-  Status CheckHasCommittedOpInCurrentTermUnlocked() const;
-
   // Returns the id of the latest pending transaction (i.e. the one with the
   // latest index). This must be called under the lock.
   OpId GetLastPendingTransactionOpIdUnlocked() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/f2776c89/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index a950ad4..1acb63b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2562,7 +2562,7 @@ TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) {
                                  tablet_servers_[cluster_->tablet_server(1)->uuid()],
                                  boost::none, timeout);
   ASSERT_TRUE(!s.ok()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "Latest committed op is not from this term");
+  ASSERT_STR_CONTAINS(s.ToString(), "Leader has not yet committed an operation in its own term");
 }
 
 // Test that if for some reason none of the transactions can be prepared, that it will come