You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/08 02:12:21 UTC

[1/4] kudu git commit: Cleanup/refactor tracking of consensus watermarks

Repository: kudu
Updated Branches:
  refs/heads/master b1f1388e2 -> 46d9ed7aa


http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index c971b02..451d5f1 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -151,19 +151,23 @@ class RaftConsensus : public Consensus,
   // can cause consensus to deadlock.
   ReplicaState* GetReplicaStateForTests();
 
+  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
+
+
+  //------------------------------------------------------------
+  // PeerMessageQueueObserver implementation
+  //------------------------------------------------------------
+
   // Updates the committed_index and triggers the Apply()s for whatever
   // transactions were pending.
   // This is idempotent.
-  void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                OpId* committed_index) OVERRIDE;
+  void NotifyCommitIndex(int64_t commit_index) override;
 
-  virtual void NotifyTermChange(int64_t term) OVERRIDE;
+  void NotifyTermChange(int64_t term) override;
 
-  virtual void NotifyFailedFollower(const std::string& uuid,
-                                    int64_t term,
-                                    const std::string& reason) OVERRIDE;
-
-  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
+  void NotifyFailedFollower(const std::string& uuid,
+                            int64_t term,
+                            const std::string& reason) override;
 
  protected:
   // Trigger that a non-Transaction ConsensusRound has finished replication.
@@ -347,9 +351,6 @@ class RaftConsensus : public Consensus,
   Status RequestVoteRespondVoteGranted(const VoteRequestPB* request,
                                        VoteResponsePB* response);
 
-  void UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                        OpId* committed_index);
-
   // Callback for leader election driver. ElectionCallback is run on the
   // reactor thread, so it simply defers its work to DoElectionCallback.
   void ElectionCallback(const ElectionResult& result);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index ee1a568..2e7a746 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -301,7 +301,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
   // Waits for an operation to be (database) committed in the replica at index
   // 'peer_idx'. If the operation was already committed this returns immediately.
-  void WaitForCommitIfNotAlreadyPresent(const OpId& to_wait_for,
+  void WaitForCommitIfNotAlreadyPresent(int64_t to_wait_for,
                                         int peer_idx,
                                         int leader_idx) {
     MonoDelta timeout(MonoDelta::FromSeconds(10));
@@ -313,13 +313,13 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     int backoff_exp = 0;
     const int kMaxBackoffExp = 8;
-    OpId committed_op_id;
+    int64_t committed_op_idx;
     while (true) {
       {
         ReplicaState::UniqueLock lock;
         CHECK_OK(state->LockForRead(&lock));
-        committed_op_id = state->GetCommittedOpIdUnlocked();
-        if (OpIdCompare(committed_op_id, to_wait_for) >= 0) {
+        committed_op_idx = state->GetCommittedIndexUnlocked();
+        if (committed_op_idx >= to_wait_for) {
           return;
         }
       }
@@ -332,7 +332,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of "
                << "op " << to_wait_for << " on replica. Last committed op on replica: "
-               << committed_op_id << ". Dumping state and quitting.";
+               << committed_op_idx << ". Dumping state and quitting.";
     vector<string> lines;
     scoped_refptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
@@ -608,8 +608,8 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitMessage) {
   // We thus wait for the commit callback to trigger, ensuring durability
   // on the leader and then for the commits to be present on the replicas.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -646,8 +646,8 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitSequence) {
   // See comment at the end of TestFollowersReplicateAndCommitMessage
   // for an explanation on this waiting sequence.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -686,12 +686,12 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
     // this would hang here). We know he must have replicated but make sure
     // by calling Wait().
     WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx);
-    WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, kLeaderIdx);
+    WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower1Idx, kLeaderIdx);
   }
 
   // After we let the lock go the remaining follower should get up-to-date
   WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx);
-  WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower0Idx, kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -738,8 +738,8 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
   // Assert that everything was ok
   WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
   WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -772,8 +772,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
 
   // The commit should eventually reach both followers as well.
   last_op_id = round->id();
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
 
   // Append a sequence of messages, and keep injecting errors into the
   // replica proxies.
@@ -804,8 +804,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
   // See comment at the end of TestFollowersReplicateAndCommitMessage
   // for an explanation on this waiting sequence.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -847,8 +847,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
   OpId config_round;
   config_round.set_term(1);
   config_round.set_index(1);
-  WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower1Idx, kLeaderIdx);
 
   int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
   int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();
@@ -896,7 +896,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
     // Make sure the last operation is committed everywhere
     ASSERT_OK(last_commit_sync->Wait());
     for (int i = 0; i < current_config_size - 1; i++) {
-      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1);
+      WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 1);
     }
 
     // Now shutdown the current leader.
@@ -929,7 +929,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
     // Make sure the last operation is committed everywhere
     ASSERT_OK(last_commit_sync->Wait());
     for (int i = 0; i < current_config_size - 2; i++) {
-      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2);
+      WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 2);
     }
   }
   // We can only verify the logs of the peers that were not killed, due to the
@@ -953,8 +953,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
 
   // Make sure the last operation is committed everywhere
   ASSERT_OK(last_commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
 
   // Now replicas should only accept operations with
   // 'last_id' as the preceding id.
@@ -971,7 +971,7 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
   req.set_caller_uuid(leader->peer_uuid());
   req.set_caller_term(last_op_id.term());
   req.mutable_preceding_id()->CopyFrom(last_op_id);
-  req.mutable_committed_index()->CopyFrom(last_op_id);
+  req.set_committed_index(last_op_id.index());
 
   ReplicateMsg* replicate = req.add_ops();
   replicate->set_timestamp(clock_->Now().ToUint64());
@@ -1016,8 +1016,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
 
   // Make sure the last operation is committed everywhere
   ASSERT_OK(last_commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
 
   // Ensure last-logged OpId is > (0,0).
   ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
@@ -1109,7 +1109,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ConsensusRequestPB req;
   req.set_caller_term(last_op_id.term());
   req.set_caller_uuid("peer-0");
-  req.mutable_committed_index()->CopyFrom(last_op_id);
+  req.set_committed_index(last_op_id.index());
   ConsensusResponsePB res;
   Status s = peer->Update(&req, &res);
   ASSERT_EQ(last_op_id.term() + 3, res.responder_term());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index 1ab803b..7631f26 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -52,7 +52,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid,
       txn_factory_(txn_factory),
       last_received_op_id_(MinimumOpId()),
       last_received_op_id_current_leader_(MinimumOpId()),
-      last_committed_index_(MinimumOpId()),
+      last_committed_op_id_(MinimumOpId()),
       state_(kInitialized) {
   CHECK(cmeta_) << "ConsensusMeta passed as NULL";
 }
@@ -118,22 +118,6 @@ Status ReplicaState::LockForCommit(UniqueLock* lock) const {
   return Status::OK();
 }
 
-Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(update_lock_);
-
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-
-  if (PREDICT_FALSE(GetActiveRoleUnlocked() != RaftPeerPB::LEADER)) {
-    return Status::IllegalState("Replica not LEADER");
-  }
-  lock->swap(l);
-  return Status::OK();
-}
-
 Status ReplicaState::CheckActiveLeaderUnlocked() const {
   RaftPeerPB::Role role = GetActiveRoleUnlocked();
   switch (role) {
@@ -278,7 +262,7 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch
 
   *term_mismatch = false;
 
-  if (op_id.index() <= GetCommittedOpIdUnlocked().index()) {
+  if (op_id.index() <= GetCommittedIndexUnlocked()) {
     return true;
   }
 
@@ -392,7 +376,7 @@ Status ReplicaState::CancelPendingTransactions() {
 void ReplicaState::GetUncommittedPendingOperationsUnlocked(
     vector<scoped_refptr<ConsensusRound> >* ops) {
   for (const IndexToRoundMap::value_type& entry : pending_txns_) {
-    if (entry.first > last_committed_index_.index()) {
+    if (entry.first > last_committed_op_id_.index()) {
       ops->push_back(entry.second);
     }
   }
@@ -414,8 +398,8 @@ Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
     new_preceding = (*iter).second->replicate_msg()->id();
     ++iter;
   } else {
-    CHECK_EQ(new_preceding_idx, last_committed_index_.index());
-    new_preceding = last_committed_index_;
+    CHECK_EQ(new_preceding_idx, last_committed_op_id_.index());
+    new_preceding = last_committed_op_id_;
   }
 
   // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
@@ -503,93 +487,45 @@ scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(in
   return FindPtrOrNull(pending_txns_, index);
 }
 
-Status ReplicaState::UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                                      OpId* committed_index,
-                                                      bool* committed_index_changed) {
-  DCHECK(update_lock_.is_locked());
-  DCHECK(majority_replicated.IsInitialized());
-  DCHECK(last_committed_index_.IsInitialized());
-  if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) {
-    return Status::ServiceUnavailable("Cannot trigger apply. Replica is shutting down.");
-  }
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Cannot trigger apply. Replica is not in kRunning state.");
-  }
-
-  // If the last committed operation was in the current term (the normal case)
-  // then 'committed_index' is simply equal to majority replicated.
-  if (last_committed_index_.term() == GetCurrentTermUnlocked()) {
-    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
-                                                committed_index_changed));
-    committed_index->CopyFrom(last_committed_index_);
-    return Status::OK();
-  }
-
-  // If the last committed operation is not in the current term (such as when
-  // we change leaders) but 'majority_replicated' is then we can advance the
-  // 'committed_index' too.
-  if (majority_replicated.term() == GetCurrentTermUnlocked()) {
-    OpId previous = last_committed_index_;
-    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
-                                                committed_index_changed));
-    committed_index->CopyFrom(last_committed_index_);
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advanced the committed_index across terms."
-        << " Last committed operation was: " << previous.ShortDebugString()
-        << " New committed index is: " << last_committed_index_.ShortDebugString();
-    return Status::OK();
-  }
-
-  committed_index->CopyFrom(last_committed_index_);
-  KLOG_EVERY_N_SECS(WARNING, 1) << LogPrefixUnlocked()
-          << "Can't advance the committed index across term boundaries"
-          << " until operations from the current term are replicated."
-          << " Last committed operation was: " << last_committed_index_.ShortDebugString() << ","
-          << " New majority replicated is: " << majority_replicated.ShortDebugString() << ","
-          << " Current term is: " << GetCurrentTermUnlocked();
-
-  return Status::OK();
-}
-
-Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index,
-                                                   bool *committed_index_changed) {
-  *committed_index_changed = false;
+Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index) {
   // If we already committed up to (or past) 'id' return.
   // This can happen in the case that multiple UpdateConsensus() calls end
   // up in the RPC queue at the same time, and then might get interleaved out
   // of order.
-  if (last_committed_index_.index() >= committed_index.index()) {
+  if (last_committed_op_id_.index() >= committed_index) {
     VLOG_WITH_PREFIX_UNLOCKED(1)
-      << "Already marked ops through " << last_committed_index_ << " as committed. "
+      << "Already marked ops through " << last_committed_op_id_ << " as committed. "
       << "Now trying to mark " << committed_index << " which would be a no-op.";
     return Status::OK();
   }
 
   if (pending_txns_.empty()) {
-    last_committed_index_.CopyFrom(committed_index);
+    LOG(ERROR) << "Advancing commit index to " << committed_index
+               << " from " << last_committed_op_id_
+               << " we have no pending txns"
+               << GetStackTrace();
     VLOG_WITH_PREFIX_UNLOCKED(1) << "No transactions to mark as committed up to: "
-        << committed_index.ShortDebugString();
+                                 << committed_index;
     return Status::OK();
   }
 
   // Start at the operation after the last committed one.
-  auto iter = pending_txns_.upper_bound(last_committed_index_.index());
+  auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
   // Stop at the operation after the last one we must commit.
-  auto end_iter = pending_txns_.upper_bound(committed_index.index());
+  auto end_iter = pending_txns_.upper_bound(committed_index);
   CHECK(iter != pending_txns_.end());
 
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Last triggered apply was: "
-      <<  last_committed_index_.ShortDebugString()
+      <<  last_committed_op_id_
       << " Starting to apply from log index: " << (*iter).first;
 
-  OpId prev_id = last_committed_index_;
-
   while (iter != end_iter) {
     scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
     DCHECK(round);
     const OpId& current_id = round->id();
 
-    if (PREDICT_TRUE(!OpIdEquals(prev_id, MinimumOpId()))) {
-      CHECK_OK(CheckOpInSequence(prev_id, current_id));
+    if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
+      CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
     }
 
     pending_txns_.erase(iter++);
@@ -622,23 +558,53 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index,
       }
     }
 
-    prev_id.CopyFrom(round->id());
+    last_committed_op_id_ = round->id();
     round->NotifyReplicationFinished(Status::OK());
   }
 
-  last_committed_index_.CopyFrom(committed_index);
-  *committed_index_changed = true;
   return Status::OK();
 }
 
-const OpId& ReplicaState::GetCommittedOpIdUnlocked() const {
+
+Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId& committed_op) {
+  CHECK_EQ(last_committed_op_id_.index(), 0);
+  if (!pending_txns_.empty()) {
+    int64_t first_pending_index = pending_txns_.begin()->first;
+    if (committed_op.index() < first_pending_index) {
+      if (committed_op.index() != first_pending_index - 1) {
+        return Status::Corruption(Substitute(
+            "pending operations should start at first operation "
+            "after the committed operation (committed=$0, first pending=$1)",
+            OpIdToString(committed_op), first_pending_index));
+      }
+      last_committed_op_id_ = committed_op;
+    }
+
+    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(committed_op.index()));
+    CHECK_EQ(last_committed_op_id_.ShortDebugString(),
+             committed_op.ShortDebugString());
+
+  } else {
+    last_committed_op_id_ = committed_op;
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "setting committed at start to " << committed_op;
+  }
+  return Status::OK();
+}
+
+int64_t ReplicaState::GetCommittedIndexUnlocked() const {
   DCHECK(update_lock_.is_locked());
-  return last_committed_index_;
+  return last_committed_op_id_.index();
 }
 
+int64_t ReplicaState::GetTermWithLastCommittedOpUnlocked() const {
+  DCHECK(update_lock_.is_locked());
+  return last_committed_op_id_.term();
+}
+
+
 Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
   int64_t term = GetCurrentTermUnlocked();
-  const OpId& opid = GetCommittedOpIdUnlocked();
+  const OpId& opid = last_committed_op_id_;
   if (opid.term() != term) {
     return Status::IllegalState("Latest committed op is not from this term", OpIdToString(opid));
   }
@@ -737,7 +703,7 @@ string ReplicaState::ToStringUnlocked() const {
 
   SubstituteAndAppend(&ret, "Watermarks: {Received: $0 Committed: $1}\n",
                       last_received_op_id_.ShortDebugString(),
-                      last_committed_index_.ShortDebugString());
+                      last_committed_op_id_.ShortDebugString());
   return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index 9ce24ab..aa30ead 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -138,12 +138,6 @@ class ReplicaState {
   // Obtains the lock for a state read, does not check state.
   Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
 
-  // Obtains the lock so that we can advance the majority replicated
-  // index and possibly the committed index.
-  // Requires that this peer is leader.
-  Status LockForMajorityReplicatedIndexUpdate(
-      UniqueLock* lock) const WARN_UNUSED_RESULT;
-
   // Ensure the local peer is the active leader.
   // Returns OK if leader, IllegalState otherwise.
   Status CheckActiveLeaderUnlocked() const;
@@ -247,27 +241,21 @@ class ReplicaState {
   // Add 'round' to the set of rounds waiting to be committed.
   Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
 
-  // Marks ReplicaTransactions up to 'id' as majority replicated, meaning the
-  // transaction may Apply() (immediately if Prepare() has completed or when Prepare()
-  // completes, if not).
-  //
-  // If this advanced the committed index, sets *committed_index_changed to true.
-  Status UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                          OpId* committed_index,
-                                          bool* committed_index_changed);
-
   // Advances the committed index.
   // This is a no-op if the committed index has not changed.
-  // Returns in '*committed_index_changed' whether the operation actually advanced
-  // the index.
-  Status AdvanceCommittedIndexUnlocked(const OpId& committed_index,
-                                       bool* committed_index_changed);
+  Status AdvanceCommittedIndexUnlocked(int64_t committed_index);
+
+  // Set the committed op during startup. This should be done after
+  // appending any of the pending transactions, and will take care
+  // of triggering any that are now considered committed.
+  Status SetInitialCommittedOpIdUnlocked(const OpId& committed_op);
 
   // Returns the watermark below which all operations are known to
   // be committed according to consensus.
   //
   // This must be called under a lock.
-  const OpId& GetCommittedOpIdUnlocked() const;
+  int64_t GetCommittedIndexUnlocked() const;
+  int64_t GetTermWithLastCommittedOpUnlocked() const;
 
   // Returns OK iff an op from the current term has been committed.
   Status CheckHasCommittedOpInCurrentTermUnlocked() const;
@@ -378,9 +366,12 @@ class ReplicaState {
   // involved in resetting this every time a new node becomes leader.
   OpId last_received_op_id_current_leader_;
 
-  // The id of the Apply that was last triggered when the last message from the leader
+  // The OpId of the Apply that was last triggered when the last message from the leader
   // was received. Initialized to MinimumOpId().
-  OpId last_committed_index_;
+  //
+  // TODO: are there cases where this doesn't track the actual commit index,
+  // if there are no-ops?
+  OpId last_committed_op_id_;
 
   State state_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 387c048..63be00b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -973,6 +973,7 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   }
 
   for (CountDownLatch* latch : latches) {
+    NO_FATALS(cluster_->AssertNoCrashes());
     latch->Wait();
     StopOrKillLeaderAndElectNewOne();
   }
@@ -1175,7 +1176,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   req.set_dest_uuid(replica_ts->uuid());
   req.set_caller_uuid("fake_caller");
   req.set_caller_term(2);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1));
+  req.set_committed_index(1);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
 
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1234,7 +1235,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   req.clear_ops();
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
   AddOp(MakeOpId(2, 3), &req);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4));
+  req.set_committed_index(4);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << resp.DebugString();
@@ -1249,7 +1250,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   resp.Clear();
   req.clear_ops();
   // Now send some more ops, and commit the earlier ones.
-  req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4));
+  req.set_committed_index(4);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
   AddOp(MakeOpId(2, 5), &req);
   AddOp(MakeOpId(2, 6), &req);
@@ -1314,7 +1315,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // the earlier ops.
   {
     req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6));
-    req.mutable_committed_index()->CopyFrom(MakeOpId(leader_term, 6));
+    req.set_committed_index(6);
     req.clear_ops();
     rpc.Reset();
     ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -2034,7 +2035,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   req.set_tablet_id(tablet_id_);
   req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
   req.set_caller_term(1);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1));
+  req.set_committed_index(1);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
   for (int i = 0; i < kNumOps; i++) {
     AddOp(MakeOpId(1, 2 + i), &req);
@@ -2058,7 +2059,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   // 1. Replicate just one new operation.
   // 2. Tell the follower that the previous set of operations were committed.
   req.mutable_preceding_id()->CopyFrom(last_opid);
-  req.mutable_committed_index()->CopyFrom(last_opid);
+  req.set_committed_index(last_opid.index());
   req.mutable_ops()->Clear();
   AddOp(MakeOpId(1, last_opid.index() + 1), &req);
   rpc.Reset();
@@ -2540,7 +2541,7 @@ TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
   req.set_tablet_id(tablet_id_);
   req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
   req.set_caller_term(0);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(0, 0));
+  req.set_committed_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
   for (int i = 0; i < kNumOps; i++) {
     AddOp(MakeOpId(0, 1 + i), &req);


[4/4] kudu git commit: tablet_server-stress-test: limit runtime of this test

Posted by to...@apache.org.
tablet_server-stress-test: limit runtime of this test

This stress test is typically the long pole in the dist-test runtime.
For example, in a recent ASAN run[1] the test took around 730 seconds.

This changes the behavior of the test to run for a prescribed amount of
time (60 seconds in slow mode, 10 in fast). This should keep relatively
good coverage while avoiding such long test runs.

[1] http://dist-test.cloudera.org/trace?job_id=jenkins-slave.1473295331.9755

Change-Id: I7441f50bcd4788e3e54a90bd5f782201a7d4c6af
Reviewed-on: http://gerrit.cloudera.org:8080/4329
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/46d9ed7a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/46d9ed7a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/46d9ed7a

Branch: refs/heads/master
Commit: 46d9ed7aa86e7bcd9649ec37af1fbd8369d5c0fe
Parents: 2876683
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Sep 7 18:23:52 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Sep 8 02:06:54 2016 +0000

----------------------------------------------------------------------
 src/kudu/scripts/benchmarks.sh                |  2 +-
 src/kudu/tserver/tablet_server-stress-test.cc | 39 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/46d9ed7a/src/kudu/scripts/benchmarks.sh
----------------------------------------------------------------------
diff --git a/src/kudu/scripts/benchmarks.sh b/src/kudu/scripts/benchmarks.sh
index 799d7be..be110f1 100755
--- a/src/kudu/scripts/benchmarks.sh
+++ b/src/kudu/scripts/benchmarks.sh
@@ -240,7 +240,7 @@ run_benchmarks() {
   # Run multi-threaded TS insert benchmark
   for i in $(seq 1 $NUM_SAMPLES) ; do
     KUDU_ALLOW_SLOW_TESTS=1 build/latest/bin/tablet_server-stress-test \
-      --num_inserts_per_thread=30000 &> $LOGDIR/${TS_8THREAD_BENCH}$i.log
+      --num_inserts_per_thread=30000 -runtime_secs=0 &> $LOGDIR/${TS_8THREAD_BENCH}$i.log
   done
 
   # Run full stack scan/insert test using MRS only, ~26s each

http://git-wip-us.apache.org/repos/asf/kudu/blob/46d9ed7a/src/kudu/tserver/tablet_server-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-stress-test.cc b/src/kudu/tserver/tablet_server-stress-test.cc
index cfe24b1..87d24fd 100644
--- a/src/kudu/tserver/tablet_server-stress-test.cc
+++ b/src/kudu/tserver/tablet_server-stress-test.cc
@@ -16,12 +16,22 @@
 // under the License.
 #include "kudu/tserver/tablet_server-test-base.h"
 
+#include <thread>
+
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/stopwatch.h"
 
+DEFINE_int32(runtime_secs, 10,
+             "Maximum number of seconds to run. If the threads have not completed "
+             "inserting by this time, they will stop regardless. Set to 0 to disable "
+             "the timeout.");
 DEFINE_int32(num_inserter_threads, 8, "Number of inserter threads to run");
-DEFINE_int32(num_inserts_per_thread, 0, "Number of inserts from each thread");
+DEFINE_int32(num_inserts_per_thread, 100000000,
+             "Number of inserts from each thread. If 'runtime_secs' is non-zero, threads will "
+             "exit after that time out even if they have not inserted the desired number. The "
+             "default is set high so that, typically, the 'runtime_secs' parameter determines "
+             "how long this test will run.");
 DECLARE_bool(enable_maintenance_manager);
 
 METRIC_DEFINE_histogram(test, insert_latency,
@@ -37,11 +47,10 @@ namespace tserver {
 class TSStressTest : public TabletServerTestBase {
  public:
   TSStressTest()
-    : start_latch_(FLAGS_num_inserter_threads) {
+      : start_latch_(FLAGS_num_inserter_threads),
+        stop_latch_(1) {
 
-    if (FLAGS_num_inserts_per_thread == 0) {
-      FLAGS_num_inserts_per_thread = AllowSlowTests() ? 100000 : 1000;
-    }
+    OverrideFlagForSlowTests("runtime_secs", "60");
 
     // Re-enable the maintenance manager which is disabled by default
     // in TS tests. We want to stress the whole system including
@@ -76,6 +85,7 @@ class TSStressTest : public TabletServerTestBase {
  protected:
   scoped_refptr<Histogram> histogram_;
   CountDownLatch start_latch_;
+  CountDownLatch stop_latch_;
   std::vector<scoped_refptr<kudu::Thread> > threads_;
 };
 
@@ -87,7 +97,7 @@ void TSStressTest::InserterThread(int thread_idx) {
 
   uint64_t max_rows = FLAGS_num_inserts_per_thread;
   int start_row = thread_idx * max_rows;
-  for (int i = start_row; i < start_row + max_rows ; i++) {
+  for (int i = start_row; i < start_row + max_rows && stop_latch_.count() > 0; i++) {
     MonoTime before = MonoTime::Now();
     InsertTestRowsRemote(thread_idx, i, 1);
     MonoTime after = MonoTime::Now();
@@ -98,23 +108,36 @@ void TSStressTest::InserterThread(int thread_idx) {
 }
 
 TEST_F(TSStressTest, TestMTInserts) {
+  std::thread timeout_thread;
   StartThreads();
   Stopwatch s(Stopwatch::ALL_THREADS);
   s.start();
+
+  // Start a thread to fire 'stop_latch_' after the prescribed number of seconds.
+  if (FLAGS_runtime_secs > 0) {
+    timeout_thread = std::thread([&]() {
+      stop_latch_.WaitFor(MonoDelta::FromSeconds(FLAGS_runtime_secs));
+      stop_latch_.CountDown();
+    });
+  }
   JoinThreads();
   s.stop();
-  int num_rows = (FLAGS_num_inserter_threads * FLAGS_num_inserts_per_thread);
+
+  int num_rows = histogram_->TotalCount();
   LOG(INFO) << "Inserted " << num_rows << " rows in " << s.elapsed().wall_millis() << " ms";
   LOG(INFO) << "Throughput: " << (num_rows * 1000 / s.elapsed().wall_millis()) << " rows/sec";
   LOG(INFO) << "CPU efficiency: " << (num_rows / s.elapsed().user_cpu_seconds()) << " rows/cpusec";
 
-
   // Generate the JSON.
   std::stringstream out;
   JsonWriter writer(&out, JsonWriter::PRETTY);
   ASSERT_OK(histogram_->WriteAsJson(&writer, MetricJsonOptions()));
 
   LOG(INFO) << out.str();
+
+  // Ensure the timeout thread is stopped before exiting.
+  stop_latch_.CountDown();
+  if (timeout_thread.joinable()) timeout_thread.join();
 }
 
 } // namespace tserver


[2/4] kudu git commit: Cleanup/refactor tracking of consensus watermarks

Posted by to...@apache.org.
Cleanup/refactor tracking of consensus watermarks

This is a fairly invasive cleanup/refactor to consensus in preparation
for propagating replication watermarks from the leader to the followers.

The key item here is to address a long-standing TODO that the
PeerMessageQueue should itself calculate the commit index, rather than
delegate that job to ReplicaState. The flow used to be something like
this on the leader upon receiving a response from a peer that it has
replicated some new operations:

 - PeerMessageQueue::ResponseFromPeer
   - updates peer last_received
   - calls PeerMessageQueue::AdvanceQueueWatermark
     - updates PeerMessageQueue::majority_replicated to the new majority
       watermark
   - calls NotifyObserversOfMajorityReplOp
   --> submits an async task to the raft threadpool
        - for each observer (best I can tell there is currently always
          only one, but that's a separate issue)
        -- observer->UpdateMajorityReplicated() to grab committed index
           (the observer is always the RaftConsensus instance)
        --- RaftConsensus::UpdateMajorityReplicated
        ---- ReplicaState::UpdateMajorityReplicatedUnlocked
        ----- this checks the condition that the majority-replicated
              watermark is within the leader's term before advancing the
              commit index.
        ----- calls AdvanceCommittedIndexUnlocked
        ------- commits stuff, etc
        ----- sets *committed_index out-param
        -- PeerMessageQueue picks up this out-param and sets the new
           value within the PeerMessageQueue

This was very difficult to follow, given that the "observer" in fact was
passing data back to the "notifier" through an out-parameter. Moreover,
it wasn't obvious to see which class was "authoritative" for the
tracking and advancement of watermarks.

The new design makes the PeerMessageQueue itself fully responsible for
calculating when the commit index can advance. The flow is now the
following, with '!' at the beginning of the line to denote where it is
changed from before:

 - PeerMessageQueue::ResponseFromPeer
   - updates peer last_received
   - calls PeerMessageQueue::AdvanceQueueWatermark
     - updates PeerMessageQueue::majority_replicated to the new majority
       watermark.
!  - If the majority-replicated watermark is within the current leader
!    term, advances the commit index (stored in QueueState)
!    - notifies observers of the new commit index
!      - calls observer->NotifyCommitIndex() with the new commit index
!        - RaftConsensus::NotifyCommitIndex()
!          - ReplicaState::AdvanceCommittedIndexUnlocked
             - commits stuff, etc.

This required a fairly invasive surgery because the PeerMessageQueue
itself doesn't remember the terms of pending operations, and thus the
watermarks became indexes instead of OpIds.

This is itself also a simplification -- we previously were very messy
about using the word "index" when in fact we had an OpId type. In almost
every case, we only used the index of those OpIds. In the Raft paper
itself, these watermarks are just indexes and not OpIds, so this change
brings us closer to the implementation described in the original design.

I looped raft-consensus-itest.MultiThreadedInsertWithFailovers 800
times and the whole suite 500 times and got no failures[1].

[1] http://dist-test.cloudera.org//job?job_id=todd.1473280738.28972

Change-Id: I2aa294472f018013d88f36a9358e9ebd9d5ed8f8
Reviewed-on: http://gerrit.cloudera.org:8080/4133
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ce0fcd4d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ce0fcd4d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ce0fcd4d

Branch: refs/heads/master
Commit: ce0fcd4dd65a23aebf2601570e335c1b6ce2a0c9
Parents: b1f1388
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Aug 26 02:15:49 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Sep 8 01:03:45 2016 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc                         |   9 +
 src/kudu/consensus/consensus-test-util.h        |  24 --
 src/kudu/consensus/consensus.proto              |  26 +--
 src/kudu/consensus/consensus_peers-test.cc      |  50 ++--
 src/kudu/consensus/consensus_peers.cc           |   4 +-
 src/kudu/consensus/consensus_queue-test.cc      | 184 ++++++++-------
 src/kudu/consensus/consensus_queue.cc           | 226 ++++++++++++-------
 src/kudu/consensus/consensus_queue.h            |  60 +++--
 src/kudu/consensus/raft_consensus-test.cc       | 111 +--------
 src/kudu/consensus/raft_consensus.cc            |  91 ++++----
 src/kudu/consensus/raft_consensus.h             |  23 +-
 .../consensus/raft_consensus_quorum-test.cc     |  54 ++---
 src/kudu/consensus/raft_consensus_state.cc      | 144 +++++-------
 src/kudu/consensus/raft_consensus_state.h       |  35 ++-
 .../integration-tests/raft_consensus-itest.cc   |  15 +-
 15 files changed, 464 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 281eb81..be89002 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -38,6 +38,15 @@ If you are new to Kudu, check out its list of link:index.html[features and benef
 Kudu 1.0.0 delivers a number of new features, bug fixes, and optimizations,
 detailed below.
 
+Kudu 1.0.0 maintains client-server wire-compatibility with previous releases,
+that applications using the Kudu client libraries may be upgraded either
+before, at the same time, or after the Kudu servers.
+
+Kudu 1.0.0 does _not_ maintain server-server wire compatibility with previous
+releases. Therefore, rolling upgrades between earlier versions of Kudu and
+Kudu 1.0.0 are not supported.
+
+
 [[rn_1.0.0_incompatible_changes]]
 
 - The `kudu-pbc-dump` tool has been removed. The same functionality is now

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index ed3922e..b6fc201 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -841,30 +841,6 @@ class CounterHooks : public Consensus::ConsensusFaultHooks {
   mutable simple_spinlock lock_;
 };
 
-class TestRaftConsensusQueueIface : public PeerMessageQueueObserver {
- public:
-  bool IsMajorityReplicated(int64_t index) {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return index <= majority_replicated_index_;
-  }
-
- protected:
-  virtual void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                        OpId* committed_index) OVERRIDE {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    majority_replicated_index_ = majority_replicated.index();
-    committed_index->CopyFrom(majority_replicated);
-  }
-  virtual void NotifyTermChange(int64_t term) OVERRIDE {}
-  virtual void NotifyFailedFollower(const std::string& uuid,
-                                    int64_t term,
-                                    const std::string& reason) OVERRIDE {}
-
- private:
-  mutable simple_spinlock lock_;
-  int64_t majority_replicated_index_;
-};
-
 }  // namespace consensus
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 5091c3c..bde2b2a 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -199,25 +199,12 @@ message CommitMsg {
 //  Internal Consensus Messages and State
 // ===========================================================================
 
-// NoOp requests, mostly used in tests.
+// NO_OP requests are replicated by a peer after being elected leader.
 message NoOpRequestPB {
  // Allows to set a dummy payload, for tests.
  optional bytes payload_for_tests = 1;
 }
 
-// NoOp responses, mostly used in tests.
-message NoOpResponsePB {
-   // Allows to set a dummy payload, for tests.
-   optional bytes payload_for_tests = 1;
-}
-
-message PerOpErrorPB {
-  // The id of the operation that failed in the other peer.
-  required OpId id = 1;
-  // The Status explaining why the operation failed.
-  required AppStatusPB status = 2;
-}
-
 // Status message received in the peer responses.
 message ConsensusStatusPB {
 
@@ -330,12 +317,17 @@ message ConsensusRequestPB {
   // This must be set if 'ops' is non-empty.
   optional OpId preceding_id = 4;
 
-  // The id of the last committed operation in the configuration. This is the
-  // id of the last operation the leader deemed committed from a consensus
+  // The index of the last committed operation in the configuration. This is the
+  // index of the last operation the leader deemed committed from a consensus
   // standpoint (not the last operation the leader applied).
   //
   // Raft calls this field 'leaderCommit'.
-  required OpId committed_index = 5;
+  optional int64 committed_index = 8;
+
+  // Deprecated field used in Kudu 0.10.0 and earlier. Remains here to prevent
+  // accidental reuse and provide a nicer error message if the user attempts
+  // a rolling upgrade.
+  optional OpId DEPRECATED_committed_index = 5;
 
   // Sequence of operations to be replicated by this peer.
   // These will be committed when committed_index advances above their

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 116ddea..7159867 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -66,12 +66,10 @@ class ConsensusPeersTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
-    consensus_.reset(new TestRaftConsensusQueueIface());
     message_queue_.reset(new PeerMessageQueue(metric_entity_,
                                               log_.get(),
                                               FakeRaftPeerPB(kLeaderUuid),
                                               kTabletId));
-    message_queue_->RegisterObserver(consensus_.get());
   }
 
   virtual void TearDown() OVERRIDE {
@@ -113,18 +111,13 @@ class ConsensusPeersTest : public KuduTest {
   // Registers a callback triggered when the op with the provided term and index
   // is committed in the test consensus impl.
   // This must be called _before_ the operation is committed.
-  void WaitForMajorityReplicatedIndex(int index) {
-    for (int i = 0; i < 100; i++) {
-      if (consensus_->IsMajorityReplicated(index)) {
-        return;
-      }
-      SleepFor(MonoDelta::FromMilliseconds(i));
-    }
-    FAIL() << "Never replicated index " << index << " on a majority";
+  void WaitForCommitIndex(int index) {
+    AssertEventually([&]() {
+        ASSERT_GE(message_queue_->GetCommittedIndexForTests(), index);
+      });
   }
 
  protected:
-  gscoped_ptr<TestRaftConsensusQueueIface> consensus_;
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   gscoped_ptr<FsManager> fs_manager_;
@@ -146,8 +139,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
   // We use a majority size of 2 since we make one fake remote peer
   // in addition to our real local log.
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   gscoped_ptr<Peer> remote_peer;
@@ -166,7 +159,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
   // now wait on the status of the last operation
   // this will complete once the peer has logged all
   // requests.
-  WaitForMajorityReplicatedIndex(20);
+  WaitForCommitIndex(20);
   // verify that the replicated watermark corresponds to the last replicated
   // message.
   CheckLastRemoteEntry(proxy, 2, 20);
@@ -174,8 +167,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
 
 TEST_F(ConsensusPeersTest, TestRemotePeers) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   // Create a set of remote peers
@@ -201,7 +194,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   // Now wait for the message to be replicated, this should succeed since
   // majority = 2 and only one peer was delayed. The majority is made up
   // of remote-peer1 and the local log.
-  WaitForMajorityReplicatedIndex(first.index());
+  WaitForCommitIndex(first.index());
 
   CheckLastLogEntry(first.term(), first.index());
   CheckLastRemoteEntry(remote_peer1_proxy, first.term(), first.index());
@@ -210,31 +203,31 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   // Wait until all peers have replicated the message, otherwise
   // when we add the next one remote_peer2 might find the next message
   // in the queue and will replicate it, which is not what we want.
-  while (!OpIdEquals(message_queue_->GetAllReplicatedIndexForTests(), first)) {
+  while (message_queue_->GetAllReplicatedIndexForTests() != first.index()) {
     SleepFor(MonoDelta::FromMilliseconds(1));
   }
 
   // Now append another message to the queue
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 2, 1);
 
-  // We should not see it replicated, even after 10ms,
+  // We should not see it committed, even after 10ms,
   // since only the local peer replicates the message.
   SleepFor(MonoDelta::FromMilliseconds(10));
-  ASSERT_FALSE(consensus_->IsMajorityReplicated(2));
+  ASSERT_LT(message_queue_->GetCommittedIndexForTests(), 2);
 
   // Signal one of the two remote peers.
   remote_peer1->SignalRequest();
   // We should now be able to wait for it to replicate, since two peers (a majority)
   // have replicated the message.
-  WaitForMajorityReplicatedIndex(2);
+  WaitForCommitIndex(2);
 }
 
 // Regression test for KUDU-699: even if a peer isn't making progress,
 // and thus always has data pending, we should be able to close the peer.
 TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
@@ -271,8 +264,8 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
 
 TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
@@ -294,7 +287,10 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
       MakeOpId(1, 1));
   initial_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
       MakeOpId(1, 1));
-  initial_resp.mutable_status()->set_last_committed_idx(0);
+  // We have to set the last_committed_index to 1 to avoid a tight loop
+  // where the peer manager keeps trying to update the peer's committed
+  // index.
+  initial_resp.mutable_status()->set_last_committed_idx(1);
   mock_proxy->set_update_response(initial_resp);
 
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
@@ -302,7 +298,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
 
   // Now wait for the message to be replicated, this should succeed since
   // the local (leader) peer always acks and the follower also acked this time.
-  WaitForMajorityReplicatedIndex(1);
+  WaitForCommitIndex(1);
 
   // Set up the peer to respond with an error.
   ConsensusResponsePB error_resp;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 29e8524..6776b50 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -176,11 +176,11 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   // The peer has no pending request nor is sending: send the request.
   bool needs_tablet_copy = false;
   int64_t commit_index_before = request_.has_committed_index() ?
-      request_.committed_index().index() : kMinimumOpIdIndex;
+      request_.committed_index() : kMinimumOpIdIndex;
   Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), &request_,
                                     &replicate_msg_refs_, &needs_tablet_copy);
   int64_t commit_index_after = request_.has_committed_index() ?
-      request_.committed_index().index() : kMinimumOpIdIndex;
+      request_.committed_index() : kMinimumOpIdIndex;
 
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Could not obtain request from queue for peer: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 41c539e..33a5f6c 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -69,9 +69,7 @@ class ConsensusQueueTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
-    consensus_.reset(new TestRaftConsensusQueueIface());
     CloseAndReopenQueue();
-    queue_->RegisterObserver(consensus_.get());
   }
 
   void CloseAndReopenQueue() {
@@ -185,7 +183,6 @@ class ConsensusQueueTest : public KuduTest {
   }
 
  protected:
-  gscoped_ptr<TestRaftConsensusQueueIface> consensus_;
   const Schema schema_;
   gscoped_ptr<FsManager> fs_manager_;
   MetricRegistry metric_registry_;
@@ -202,7 +199,7 @@ class ConsensusQueueTest : public KuduTest {
 // falls in the middle of the current messages in the queue.
 TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(2));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   ConsensusRequestPB request;
@@ -242,15 +239,13 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
 // being 'consensus_max_batch_size_bytes'
 TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(2));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
 
   // helper to estimate request size so that we can set the max batch size appropriately
   ConsensusRequestPB page_size_estimator;
   page_size_estimator.set_caller_term(14);
-  OpId* committed_index = page_size_estimator.mutable_committed_index();
-  OpId* preceding_id = page_size_estimator.mutable_preceding_id();
-  committed_index->CopyFrom(MinimumOpId());
-  preceding_id->CopyFrom(MinimumOpId());
+  page_size_estimator.set_committed_index(0);
+  page_size_estimator.mutable_preceding_id()->CopyFrom(MinimumOpId());
 
   // We're going to add 100 messages to the queue so we make each page fetch 9 of those,
   // for a total of 12 pages. The last page should have a single op.
@@ -307,18 +302,16 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
 
 TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   // Wait for the local peer to append all messages
   WaitForLocalPeerToAckIndex(100);
 
-  OpId all_replicated = MakeOpId(14, 100);
-
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
   // Since we're tracking a single peer still this should have moved the all
   // replicated watermark to the last op appended to the local log.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(14, 100));
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 100);
 
   // Start to track the peer after the queue has some messages in it
   // at a point that is halfway through the current messages in the queue.
@@ -333,8 +326,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   ASSERT_TRUE(more_pending);
 
   // Tracking a peer a new peer should have moved the all replicated watermark back.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
 
   vector<ReplicateRefPtr> refs;
   bool needs_tablet_copy;
@@ -350,8 +343,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
   ASSERT_TRUE(more_pending) << "Queue didn't have anymore requests pending";
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(14, 100));
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(14, 100));
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 100);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 100);
 
   // if we ask for a new request, it should come back with the rest of the messages
   ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
@@ -367,8 +360,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 
   WaitForLocalPeerToAckIndex(expected.index());
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected.index());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected.index());
 
   // extract the ops from the request to avoid double free
   request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
@@ -376,22 +369,21 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 
 TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(5));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
   // Track 4 additional peers (in addition to the local peer)
   queue_->TrackPeer("peer-1");
   queue_->TrackPeer("peer-2");
   queue_->TrackPeer("peer-3");
   queue_->TrackPeer("peer-4");
 
-  // Append 10 messages to the queue with a majority of 2 for a total of 3 peers.
+  // Append 10 messages to the queue.
   // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
   WaitForLocalPeerToAckIndex(10);
 
-  // Since only the local log might have ACKed at this point,
+  // Since only the local log has ACKed at this point,
   // the committed_index should be MinimumOpId().
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
   // NOTE: We don't need to get operations from the queue. The queue
   // only cares about what the peer reported as received, not what was sent.
@@ -401,7 +393,7 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   bool more_pending;
   OpId last_sent = MakeOpId(0, 5);
 
-  // Ack the first five operations for peer-1
+  // Ack the first five operations for peer-1.
   response.set_responder_uuid("peer-1");
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
 
@@ -409,40 +401,47 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   ASSERT_TRUE(more_pending);
 
   // Committed index should be the same
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
-  // Ack the first five operations for peer-2
+  // Ack the first five operations for peer-2.
   response.set_responder_uuid("peer-2");
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
   ASSERT_TRUE(more_pending);
 
-  // A majority has now replicated up to 0.5.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(0, 5));
+  // A majority has now replicated up to 0.5: local, 'peer-1', and 'peer-2'.
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
+  // However, this leader has appended operations in term 1, so we can't
+  // advance the committed index yet.
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
+  // Moreover, 'peer-3' and 'peer-4' have not acked yet, so the "all-replicated"
+  // index also cannot advance.
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
-  // Ack all operations for peer-3
+  // Ack all operations for peer-3.
   response.set_responder_uuid("peer-3");
   last_sent = MakeOpId(1, 10);
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
-
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  // The committed index moved so 'more_pending' should be true so that the peer is
-  // notified.
-  ASSERT_TRUE(more_pending);
 
-  // Majority replicated watermark should be the same
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(0, 5));
+  // peer-3 now has all operations, and the commit index hasn't advanced.
+  EXPECT_FALSE(more_pending);
+
+  // Watermarks should remain the same as above: we still have not majority-replicated
+  // anything in the current term, so committed index cannot advance.
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
-  // Ack the remaining operations for peer-4
+  // Ack the remaining operations for peer-4.
   response.set_responder_uuid("peer-4");
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending);
+  EXPECT_TRUE(more_pending);
 
   // Now that a majority of peers have replicated an operation in the queue's
   // term the committed index should advance.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(1, 10));
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 10);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 10);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 5);
 }
 
 // In this test we append a sequence of operations to a log
@@ -461,26 +460,25 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
   }
   ASSERT_OK(log_->WaitUntilAllFlushed());
 
-  OpId queues_last_op = opid;
-  queues_last_op.set_index(queues_last_op.index() - 1);
+  OpId leader_last_op;
+  log_->GetLatestEntryOpId(&leader_last_op);
 
   // Now reset the queue so that we can pass a new committed index,
   // the last operation in the log.
   CloseAndReopenQueue();
 
-  OpId committed_index;
-  committed_index.set_term(1);
-  committed_index.set_index(100);
-  queue_->Init(committed_index);
-  queue_->SetLeaderMode(committed_index, committed_index.term(), BuildRaftConfigPBForTests(3));
+  queue_->Init(leader_last_op);
+  queue_->SetLeaderMode(leader_last_op.index(),
+                        leader_last_op.term(),
+                        BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
   bool more_pending = false;
 
-  // The peer will actually be behind the first operation in the queue
-  // in this case about 50 operations before.
+  // The peer will actually be behind the first operation in the queue.
+  // In this case about 50 operations before.
   OpId peers_last_op;
   peers_last_op.set_term(1);
   peers_last_op.set_index(50);
@@ -536,13 +534,16 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   }
 
 
-  // Now reset the queue so that we can pass a new committed index,
-  // op, 2.15.
+  // Now reset the queue so that we can pass a new committed index (15).
   CloseAndReopenQueue();
 
-  OpId committed_index = MakeOpId(2, 15);
-  queue_->Init(MakeOpId(2, 20));
-  queue_->SetLeaderMode(committed_index, committed_index.term(), BuildRaftConfigPBForTests(3));
+  OpId last_in_log;
+  log_->GetLatestEntryOpId(&last_in_log);
+  int64_t committed_index = 15;
+  queue_->Init(last_in_log);
+  queue_->SetLeaderMode(committed_index,
+                        last_in_log.term(),
+                        BuildRaftConfigPBForTests(3));
 
   // Now get a request for a simulated old leader, which contains more operations
   // in term 1 than the new leader has.
@@ -563,7 +564,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   ASSERT_FALSE(needs_tablet_copy);
   ASSERT_EQ(request.ops_size(), 0);
   ASSERT_OPID_EQ(request.preceding_id(), MakeOpId(2, 20));
-  ASSERT_OPID_EQ(request.committed_index(), committed_index);
+  ASSERT_EQ(request.committed_index(), committed_index);
 
   // The old leader was still in term 1 but it increased its term with our request.
   response.set_responder_term(2);
@@ -586,7 +587,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
 
   // We're waiting for a two nodes. The all committed watermark should be
   // 0.0 since we haven't had a successful exchange with the 'remote' peer.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
   // Test even when a correct peer responds (meaning we actually get to execute
   // watermark advancement) we sill have the same all-replicated watermark.
@@ -594,7 +595,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   ASSERT_OK(queue_->AppendOperation(make_scoped_refptr(new RefCountedReplicate(replicate))));
   WaitForLocalPeerToAckIndex(21);
 
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
   // Generate another request for the remote peer, which should include
   // all of the ops since the peer's last-known committed index.
@@ -609,7 +610,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
   // Now the watermark should have advanced.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(2, 21));
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 21);
 
   // The messages still belong to the queue so we have to release them.
   request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
@@ -624,12 +625,12 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
   // Append a bunch of messages.
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
   log_->WaitUntilAllFlushed();
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(1, 10));
+
   // Now rewrite some of the operations and wait for the log to append.
   Synchronizer synch;
   CHECK_OK(queue_->AppendOperations(
-        { make_scoped_refptr(new RefCountedReplicate(
-              CreateDummyReplicate(2, 5, clock_->Now(), 0).release())) },
+      { make_scoped_refptr(new RefCountedReplicate(
+          CreateDummyReplicate(2, 5, clock_->Now(), 0).release())) },
       synch.AsStatusCallback()));
 
   // Wait for the operation to be in the log.
@@ -639,15 +640,16 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
   // in log cache.
   synch.Reset();
   CHECK_OK(queue_->AppendOperations(
-        { make_scoped_refptr(new RefCountedReplicate(
-              CreateDummyReplicate(2, 6, clock_->Now(), 0).release())) },
+      { make_scoped_refptr(new RefCountedReplicate(
+          CreateDummyReplicate(2, 6, clock_->Now(), 0).release())) },
       synch.AsStatusCallback()));
 
   // Wait for the operation to be in the log.
   ASSERT_OK(synch.Wait());
 
-  // Now the all replicated watermark should have moved backward.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(2, 6));
+  // The replication watermark on a follower should not advance by virtue of appending
+  // entries to the log.
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 }
 
 // Tests that we're advancing the watermarks properly and only when the peer
@@ -678,21 +680,22 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
 TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) {
   FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
 
+  const int kInitialCommittedIndex = 31;
   queue_->Init(MakeOpId(72, 30));
-  queue_->SetLeaderMode(MakeOpId(72, 31), 76, BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kInitialCommittedIndex, 76, BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   vector<ReplicateRefPtr> refs;
 
   bool more_pending;
-  // We expect the majority replicated watermark to star at the committed index.
-  OpId expected_majority_replicated = MakeOpId(72, 31);
+  // We expect the majority replicated watermark to start at the committed index.
+  int64_t expected_majority_replicated = kInitialCommittedIndex;
   // We expect the all replicated watermark to be reset when we track a new peer.
-  OpId expected_all_replicated = MinimumOpId();
+  int64_t expected_all_replicated = 0;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31, &more_pending);
   ASSERT_TRUE(more_pending);
@@ -727,11 +730,10 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ASSERT_TRUE(more_pending);
 
   // We've sent (and received and ack) up to 72.40 from the remote peer
-  expected_majority_replicated = MakeOpId(72, 40);
-  expected_all_replicated = MakeOpId(72, 40);
+  expected_majority_replicated = expected_all_replicated = 40;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   // Another request for this peer should get another page of messages. Still not
   // on the queue's term (and thus without advancing watermarks).
@@ -746,11 +748,10 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
   // We've now sent (and received an ack) up to 73.39
-  expected_majority_replicated = MakeOpId(73, 49);
-  expected_all_replicated = MakeOpId(73, 49);
+  expected_majority_replicated = expected_all_replicated = 49;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   // The last page of request should overwrite the peer's operations and the
   // response should finally advance the watermarks.
@@ -761,15 +762,13 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(73, 50));
 
   // We're done, both watermarks should be at the end.
-  expected_majority_replicated = MakeOpId(76, 53);
-  expected_all_replicated = MakeOpId(76, 53);
+  expected_majority_replicated = expected_all_replicated = 53;
 
-  SetLastReceivedAndLastCommitted(&response, expected_majority_replicated,
-                                  expected_majority_replicated, 31);
+  SetLastReceivedAndLastCommitted(&response, MakeOpId(76, 53), 31);
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
 }
@@ -777,7 +776,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
 // Test that Tablet Copy is triggered when a "tablet not found" error occurs.
 TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   ConsensusRequestPB request;
@@ -824,13 +823,12 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
 
   // The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex
   // has not been called.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
   // Update the committed index. In real life, this would be done by the consensus
   // implementation when it receives an updated committed index from the leader.
-  queue_->UpdateFollowerCommittedIndex(MakeOpId(1, 10));
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MakeOpId(1, 10));
+  queue_->UpdateFollowerCommittedIndex(10);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 10);
 
   // Check the metrics have the right values based on the updated committed index.
   ASSERT_EQ(queue_->metrics_.num_majority_done_ops->value(), 0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index a62a7e2..b7dff82 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -17,6 +17,8 @@
 #include "kudu/consensus/consensus_queue.h"
 
 #include <algorithm>
+#include <boost/optional.hpp>
+#include <boost/optional/optional_io.hpp>
 #include <gflags/gflags.h>
 #include <iostream>
 #include <mutex>
@@ -84,7 +86,7 @@ std::string PeerMessageQueue::TrackedPeer::ToString() const {
                     "Last known committed idx: $4, Last exchange result: $5, "
                     "Needs tablet copy: $6",
                     uuid, is_new, OpIdToString(last_received), next_index,
-                    last_known_committed_idx,
+                    last_known_committed_index,
                     is_last_exchange_successful ? "SUCCESS" : "ERROR",
                     needs_tablet_copy);
 }
@@ -107,10 +109,11 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
       metrics_(metric_entity) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
-  queue_state_.current_term = MinimumOpId().term();
-  queue_state_.committed_index = MinimumOpId();
-  queue_state_.all_replicated_opid = MinimumOpId();
-  queue_state_.majority_replicated_opid = MinimumOpId();
+  queue_state_.current_term = 0;
+  queue_state_.first_index_in_current_term = boost::none;
+  queue_state_.committed_index = 0;
+  queue_state_.all_replicated_index = 0;
+  queue_state_.majority_replicated_index = 0;
   queue_state_.state = kQueueConstructed;
   queue_state_.mode = NON_LEADER;
   queue_state_.majority_size_ = -1;
@@ -126,14 +129,18 @@ void PeerMessageQueue::Init(const OpId& last_locally_replicated) {
   TrackPeerUnlocked(local_peer_pb_.permanent_uuid());
 }
 
-void PeerMessageQueue::SetLeaderMode(const OpId& committed_index,
+void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
                                      int64_t current_term,
                                      const RaftConfigPB& active_config) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  CHECK(committed_index.IsInitialized());
-  queue_state_.current_term = current_term;
+  if (current_term != queue_state_.current_term) {
+    CHECK_GT(current_term, queue_state_.current_term) << "Terms should only increase";
+    queue_state_.first_index_in_current_term = boost::none;
+    queue_state_.current_term = current_term;
+  }
+
   queue_state_.committed_index = committed_index;
-  queue_state_.majority_replicated_opid = committed_index;
+  queue_state_.majority_replicated_index = committed_index;
   queue_state_.active_config.reset(new RaftConfigPB(active_config));
   CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config))
       << local_peer_pb_.ShortDebugString() << " not a voter in config: "
@@ -186,8 +193,8 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
   // We don't know how far back this peer is, so set the all replicated watermark to
-  // MinimumOpId. We'll advance it when we know how far along the peer is.
-  queue_state_.all_replicated_opid = MinimumOpId();
+  // 0. We'll advance it when we know how far along the peer is.
+  queue_state_.all_replicated_index = 0;
 }
 
 void PeerMessageQueue::UntrackPeer(const string& uuid) {
@@ -229,7 +236,7 @@ void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
   *fake_response.mutable_status()->mutable_last_received_current_leader() = id;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
-    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index.index());
+    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index);
   }
   bool junk;
   ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response, &junk);
@@ -250,8 +257,22 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
 
   OpId last_id = msgs.back()->get()->id();
 
-  if (last_id.term() > queue_state_.current_term) {
-    queue_state_.current_term = last_id.term();
+  // "Snoop" on the appended operations to watch for term changes (as follower)
+  // and to determine the first index in our term (as leader).
+  //
+  // TODO: it would be a cleaner design to explicitly set the first index in the
+  // leader term as part of SetLeaderMode(). However, we are currently also
+  // using that method to handle refreshing the peer list during configuration
+  // changes, so the refactor isn't trivial.
+  for (const auto& msg : msgs) {
+    const auto& id = msg->get()->id();
+    if (id.term() > queue_state_.current_term) {
+      queue_state_.current_term = id.term();
+      queue_state_.first_index_in_current_term = id.index();
+    } else if (id.term() == queue_state_.current_term &&
+               queue_state_.first_index_in_current_term == boost::none) {
+      queue_state_.first_index_in_current_term = id.index();
+    }
   }
 
   // Unlock ourselves during Append to prevent a deadlock: it's possible that
@@ -293,7 +314,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     // This is initialized to the queue's last appended op but gets set to the id of the
     // log entry preceding the first one in 'messages' if messages are found for the peer.
     preceding_id = queue_state_.last_appended;
-    request->mutable_committed_index()->CopyFrom(queue_state_.committed_index);
+    request->set_committed_index(queue_state_.committed_index);
     request->set_caller_term(queue_state_.current_term);
   }
 
@@ -427,7 +448,7 @@ Status PeerMessageQueue::GetTabletCopyRequestForPeer(const string& uuid,
 }
 
 void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
-                                             OpId* watermark,
+                                             int64_t* watermark,
                                              const OpId& replicated_before,
                                              const OpId& replicated_after,
                                              int num_peers_required,
@@ -437,7 +458,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     VLOG_WITH_PREFIX_UNLOCKED(2) << "Updating " << type << " watermark: "
         << "Peer (" << peer->ToString() << ") changed from "
         << replicated_before << " to " << replicated_after << ". "
-        << "Current value: " << watermark->ShortDebugString();
+                                 << "Current value: " << *watermark;
   }
 
   // Go through the peer's watermarks, we want the highest watermark that
@@ -447,10 +468,30 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
   // - Sort the vector
   // - Find the vector.size() - 'num_peers_required' position, this
   //   will be the new 'watermark'.
-  vector<const OpId*> watermarks;
+  vector<int64_t> watermarks;
   for (const PeersMap::value_type& peer : peers_map_) {
+    // TODO: The fact that we only consider peers whose last exchange was
+    // successful can cause the "all_replicated" watermark to lag behind
+    // farther than necessary. For example:
+    // - local peer has replicated opid 100
+    // - remote peer A has replicated opid 100
+    // - remote peer B has replication opid 10 and is catching up
+    // - remote peer A goes down
+    // Here we'd start getting 'is_last_exchange_successful == false' for peer A.
+    // In that case, the 'all_replicated_watermark', which requires 3 peers, would not
+    // be updateable, even once we've replicated peer 'B' up to opid 100. It would
+    // get "stuck" at 10. In fact, in this case, the 'majority_replicated_watermark' would
+    // also move *backwards* when peer A started getting errors.
+    //
+    // The issue with simply removing this condition is that 'last_received' does not
+    // perfectly correspond to the 'match_index' in Raft Figure 2. It is simply the
+    // highest operation in a peer's log, regardless of whether that peer currently
+    // holds a prefix of the leader's log. So, in the case that the last exchange
+    // was an error (LMP mismatch, for example), the 'last_received' is _not_ usable
+    // for watermark calculation. This could be fixed by separately storing the
+    // 'match_index' on a per-peer basis and using that for watermark calculation.
     if (peer.second->is_last_exchange_successful) {
-      watermarks.push_back(&peer.second->last_received);
+      watermarks.push_back(peer.second->last_received.index());
     }
   }
 
@@ -459,11 +500,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     return;
   }
 
-  std::sort(watermarks.begin(), watermarks.end(), OpIdIndexLessThanPtrFunctor());
+  std::sort(watermarks.begin(), watermarks.end());
 
-  OpId new_watermark = *watermarks[watermarks.size() - num_peers_required];
-  OpId old_watermark = *watermark;
-  watermark->CopyFrom(new_watermark);
+  int64_t new_watermark = watermarks[watermarks.size() - num_peers_required];
+  int64_t old_watermark = *watermark;
+  *watermark = new_watermark;
 
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Updated " << type << " watermark "
       << "from " << old_watermark << " to " << new_watermark;
@@ -473,24 +514,20 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
       VLOG_WITH_PREFIX_UNLOCKED(3) << "Peer: " << peer.second->ToString();
     }
     VLOG_WITH_PREFIX_UNLOCKED(3) << "Sorted watermarks:";
-    for (const OpId* watermark : watermarks) {
-      VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermark: " << watermark->ShortDebugString();
+    for (int64_t watermark : watermarks) {
+      VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermark: " << watermark;
     }
   }
 }
 
-void PeerMessageQueue::UpdateFollowerCommittedIndex(const OpId& committed_index) {
+void PeerMessageQueue::UpdateFollowerCommittedIndex(int64_t committed_index) {
   if (queue_state_.mode == NON_LEADER) {
     std::lock_guard<simple_spinlock> l(queue_lock_);
-    UpdateFollowerCommittedIndexUnlocked(committed_index);
+    queue_state_.committed_index = committed_index;
+    UpdateMetrics();
   }
 }
 
-void PeerMessageQueue::UpdateFollowerCommittedIndexUnlocked(const OpId& committed_index) {
-  queue_state_.committed_index.CopyFrom(committed_index);
-  UpdateMetrics();
-}
-
 void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) {
   std::lock_guard<simple_spinlock> l(queue_lock_);
   TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
@@ -504,7 +541,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
   DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
       << response.InitializationErrorString() << ". Response: " << response.ShortDebugString();
 
-  OpId updated_majority_replicated_opid;
+  boost::optional<int64_t> updated_commit_index;
   Mode mode_copy;
   {
     std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
@@ -555,7 +592,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
 
     // Update the peer status based on the response.
     peer->is_new = false;
-    peer->last_known_committed_idx = status.last_committed_idx();
+    peer->last_known_committed_index = status.last_committed_idx();
     peer->last_successful_communication_time = MonoTime::Now();
 
     // If the reported last-received op for the replica is in our local log,
@@ -584,13 +621,13 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // stepping back one-by-one from the end until we no longer have an LMP
       // error, we jump back to the last committed op indicated by the peer with
       // the hope that doing so will result in a faster catch-up process.
-      DCHECK_GE(peer->last_known_committed_idx, 0);
-      peer->next_index = peer->last_known_committed_idx + 1;
+      DCHECK_GE(peer->last_known_committed_index, 0);
+      peer->next_index = peer->last_known_committed_index + 1;
       LOG_WITH_PREFIX_UNLOCKED(INFO)
           << "Peer " << peer_uuid << " log is divergent from this leader: "
           << "its last log entry " << OpIdToString(status.last_received()) << " is not in "
           << "this leader's log and it has not received anything from this leader yet. "
-          << "Falling back to committed index " << peer->last_known_committed_idx;
+          << "Falling back to committed index " << peer->last_known_committed_index;
     }
 
     if (PREDICT_FALSE(status.has_error())) {
@@ -640,39 +677,72 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           << "Response: " << response.ShortDebugString();
     }
 
-    // If our log has the next request for the peer or if the peer's committed index is
-    // lower than our own, set 'more_pending' to true.
-    *more_pending = log_cache_.HasOpBeenWritten(peer->next_index) ||
-        (peer->last_known_committed_idx < queue_state_.committed_index.index());
-
     mode_copy = queue_state_.mode;
+
+    // If we're the leader, we can compute the new watermarks based on the progress
+    // of our followers.
+    // NOTE: it's possible this node might have lost its leadership (and the notification
+    // is just pending behind the lock we're holding), but any future leader will observe
+    // the same watermarks and make the same advancement, so this is safe.
     if (mode_copy == LEADER) {
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
-                            &queue_state_.majority_replicated_opid,
+                            &queue_state_.majority_replicated_index,
                             previous.last_received,
                             peer->last_received,
                             queue_state_.majority_size_,
                             peer);
 
-      updated_majority_replicated_opid = queue_state_.majority_replicated_opid;
+      // Advance the all replicated index.
+      AdvanceQueueWatermark("all_replicated",
+                            &queue_state_.all_replicated_index,
+                            previous.last_received,
+                            peer->last_received,
+                            peers_map_.size(),
+                            peer);
+
+      // If the majority-replicated index is in our current term,
+      // and it is above our current committed index, then
+      // we can advance the committed index.
+      //
+      // It would seem that the "it is above our current committed index"
+      // check is redundant (and could be a CHECK), but in fact the
+      // majority-replicated index can currently go down, since we don't
+      // consider peers whose last contact was an error in the watermark
+      // calculation. See the TODO in AdvanceQueueWatermark() for more details.
+      int64_t commit_index_before = queue_state_.committed_index;
+      if (queue_state_.first_index_in_current_term != boost::none &&
+          queue_state_.majority_replicated_index >= queue_state_.first_index_in_current_term &&
+          queue_state_.majority_replicated_index > queue_state_.committed_index) {
+        queue_state_.committed_index = queue_state_.majority_replicated_index;
+      } else {
+        VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
+                                     << "first index in current leader term: "
+                                     << queue_state_.first_index_in_current_term;
+      }
+
+      // Only notify observers if the commit index actually changed.
+      if (queue_state_.committed_index != commit_index_before) {
+        DCHECK_GT(queue_state_.committed_index, commit_index_before);
+        updated_commit_index = queue_state_.committed_index;
+        VLOG_WITH_PREFIX_UNLOCKED(2) << "Commit index advanced from "
+                                     << commit_index_before << " to "
+                                     << *updated_commit_index;
+      }
     }
 
-    // Advance the all replicated index.
-    AdvanceQueueWatermark("all_replicated",
-                          &queue_state_.all_replicated_opid,
-                          previous.last_received,
-                          peer->last_received,
-                          peers_map_.size(),
-                          peer);
+    // If our log has the next request for the peer or if the peer's committed index is
+    // lower than our own, set 'more_pending' to true.
+    *more_pending = log_cache_.HasOpBeenWritten(peer->next_index) ||
+        (peer->last_known_committed_index < queue_state_.committed_index);
 
-    log_cache_.EvictThroughOp(queue_state_.all_replicated_opid.index());
+    log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
 
     UpdateMetrics();
   }
 
-  if (mode_copy == LEADER) {
-    NotifyObserversOfMajorityReplOpChange(updated_majority_replicated_opid);
+  if (mode_copy == LEADER && updated_commit_index != boost::none) {
+    NotifyObserversOfCommitIndexChange(*updated_commit_index);
   }
 }
 
@@ -682,19 +752,19 @@ PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uu
   return *tracked;
 }
 
-OpId PeerMessageQueue::GetAllReplicatedIndexForTests() const {
+int64_t PeerMessageQueue::GetAllReplicatedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  return queue_state_.all_replicated_opid;
+  return queue_state_.all_replicated_index;
 }
 
-OpId PeerMessageQueue::GetCommittedIndexForTests() const {
+int64_t PeerMessageQueue::GetCommittedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
   return queue_state_.committed_index;
 }
 
-OpId PeerMessageQueue::GetMajorityReplicatedOpIdForTests() const {
+int64_t PeerMessageQueue::GetMajorityReplicatedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  return queue_state_.majority_replicated_opid;
+  return queue_state_.majority_replicated_index;
 }
 
 
@@ -704,10 +774,10 @@ void PeerMessageQueue::UpdateMetrics() {
   // For non-leaders, majority_done_ops isn't meaningful because followers don't
   // track when an op is replicated to all peers.
   metrics_.num_majority_done_ops->set_value(queue_state_.mode == LEADER ?
-    queue_state_.committed_index.index() - queue_state_.all_replicated_opid.index()
+    queue_state_.committed_index - queue_state_.all_replicated_index
     : 0);
   metrics_.num_in_progress_ops->set_value(
-    queue_state_.last_appended.index() - queue_state_.committed_index.index());
+    queue_state_.last_appended.index() - queue_state_.committed_index);
 }
 
 void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
@@ -802,13 +872,12 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
   return false; // Unreachable; here to squelch GCC warning.
 }
 
-void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange(
-    const OpId new_majority_replicated_op) {
+void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t commit_index) {
   WARN_NOT_OK(observers_pool_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask,
-           Unretained(this), new_majority_replicated_op)),
+      Bind(&PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask,
+           Unretained(this), commit_index)),
               LogPrefixUnlocked() + "Unable to notify RaftConsensus of "
-                                    "majority replicated op change.");
+                                    "commit index change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
@@ -818,27 +887,14 @@ void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
               LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
 }
 
-void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask(
-    const OpId new_majority_replicated_op) {
+void PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index) {
   std::vector<PeerMessageQueueObserver*> copy;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     copy = observers_;
   }
-
-  // TODO move commit index advancement here so that the queue is not dependent on
-  // consensus at all, but that requires a bit more work.
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : copy) {
-    observer->UpdateMajorityReplicated(new_majority_replicated_op, &new_committed_index);
-  }
-
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    if (new_committed_index.IsInitialized() &&
-        new_committed_index.index() > queue_state_.committed_index.index()) {
-      queue_state_.committed_index.CopyFrom(new_committed_index);
-    }
+    observer->NotifyCommitIndex(new_commit_index);
   }
 }
 
@@ -849,7 +905,6 @@ void PeerMessageQueue::NotifyObserversOfTermChangeTask(int64_t term) {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     copy = observers_;
   }
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : copy) {
     observer->NotifyTermChange(term);
   }
@@ -873,7 +928,6 @@ void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string& uuid,
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     observers_copy = observers_;
   }
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : observers_copy) {
     observer->NotifyFailedFollower(uuid, term, reason);
   }
@@ -895,11 +949,11 @@ string PeerMessageQueue::LogPrefixUnlocked() const {
 }
 
 string PeerMessageQueue::QueueState::ToString() const {
-  return Substitute("All replicated op: $0, Majority replicated op: $1, "
+  return Substitute("All replicated index: $0, Majority replicated index: $1, "
       "Committed index: $2, Last appended: $3, Current term: $4, Majority size: $5, "
       "State: $6, Mode: $7$8",
-      OpIdToString(all_replicated_opid), OpIdToString(majority_replicated_opid),
-      OpIdToString(committed_index), OpIdToString(last_appended), current_term,
+      all_replicated_index, majority_replicated_index,
+      committed_index, OpIdToString(last_appended), current_term,
       majority_size_, state, (mode == LEADER ? "LEADER" : "NON_LEADER"),
       active_config ? ", active raft config: " + active_config->ShortDebugString() : "");
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 6dbb477..59a570e 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_CONSENSUS_CONSENSUS_QUEUE_H_
 #define KUDU_CONSENSUS_CONSENSUS_QUEUE_H_
 
+#include <boost/optional.hpp>
 #include <iosfwd>
 #include <map>
 #include <string>
@@ -72,7 +73,7 @@ class PeerMessageQueue {
           is_new(true),
           next_index(kInvalidOpIdIndex),
           last_received(MinimumOpId()),
-          last_known_committed_idx(MinimumOpId().index()),
+          last_known_committed_index(MinimumOpId().index()),
           is_last_exchange_successful(false),
           last_successful_communication_time(MonoTime::Now()),
           needs_tablet_copy(false),
@@ -102,7 +103,7 @@ class PeerMessageQueue {
     OpId last_received;
 
     // The last committed index this peer knows about.
-    int64_t last_known_committed_idx;
+    int64_t last_known_committed_index;
 
     // Whether the last exchange with this peer was successful.
     bool is_last_exchange_successful;
@@ -138,12 +139,13 @@ class PeerMessageQueue {
   // operations and notifies observers when those change.
   // 'committed_index' corresponds to the id of the last committed operation,
   // i.e. operations with ids <= 'committed_index' should be considered committed.
+  //
   // 'current_term' corresponds to the leader's current term, this is different
   // from 'committed_index.term()' if the leader has not yet committed an
   // operation in the current term.
   // 'active_config' is the currently-active Raft config. This must always be
   // a superset of the tracked peers, and that is enforced with runtime CHECKs.
-  virtual void SetLeaderMode(const OpId& committed_index,
+  virtual void SetLeaderMode(int64_t committed_index,
                              int64_t current_term,
                              const RaftConfigPB& active_config);
 
@@ -219,7 +221,7 @@ class PeerMessageQueue {
 
   // Called by the consensus implementation to update the follower queue's
   // committed index, which is mostly used for metrics.
-  void UpdateFollowerCommittedIndex(const OpId& committed_index);
+  void UpdateFollowerCommittedIndex(int64_t committed_index);
 
   // Closes the queue, peers are still allowed to call UntrackPeer() and
   // ResponseFromPeer() but no additional peers can be tracked or messages
@@ -229,13 +231,14 @@ class PeerMessageQueue {
   virtual int64_t GetQueuedOperationsSizeBytesForTests() const;
 
   // Returns the last message replicated by all peers, for tests.
-  virtual OpId GetAllReplicatedIndexForTests() const;
-
+  virtual int64_t GetAllReplicatedIndexForTests() const;
 
-  virtual OpId GetCommittedIndexForTests() const;
+  // Returns the committed index. All operations with index less than or equal to
+  // this index have been committed.
+  virtual int64_t GetCommittedIndexForTests() const;
 
-  // Returns the current majority replicated OpId, for tests.
-  virtual OpId GetMajorityReplicatedOpIdForTests() const;
+  // Returns the current majority replicated index, for tests.
+  virtual int64_t GetMajorityReplicatedIndexForTests() const;
 
   // Returns a copy of the TrackedPeer with 'uuid' or crashes if the peer is
   // not being tracked.
@@ -288,15 +291,15 @@ class PeerMessageQueue {
 
     // The first operation that has been replicated to all currently
     // tracked peers.
-    OpId all_replicated_opid;
+    int64_t all_replicated_index;
 
     // The index of the last operation replicated to a majority.
     // This is usually the same as 'committed_index' but might not
     // be if the terms changed.
-    OpId majority_replicated_opid;
+    int64_t majority_replicated_index;
 
     // The index of the last operation to be considered committed.
-    OpId committed_index;
+    int64_t committed_index;
 
     // The opid of the last operation appended to the queue.
     OpId last_appended;
@@ -305,10 +308,13 @@ class PeerMessageQueue {
     // Set by the last appended operation.
     // If the queue owner's term is less than the term observed
     // from another peer the queue owner must step down.
-    // TODO: it is likely to be cleaner to get this from the ConsensusMetadata
-    // rather than by snooping on what operations are appended to the queue.
     int64_t current_term;
 
+    // The first index that we saw that was part of this current term.
+    // When the term advances, this is set to boost::none, and then set
+    // when the first operation is appended in the new term.
+    boost::optional<int64_t> first_index_in_current_term;
+
     // The size of the majority for the queue.
     int majority_size_;
 
@@ -329,8 +335,8 @@ class PeerMessageQueue {
   // fatal error.
   bool IsOpInLog(const OpId& desired_op) const;
 
-  void NotifyObserversOfMajorityReplOpChange(const OpId new_majority_replicated_op);
-  void NotifyObserversOfMajorityReplOpChangeTask(const OpId new_majority_replicated_op);
+  void NotifyObserversOfCommitIndexChange(int64_t new_commit_index);
+  void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index);
 
   void NotifyObserversOfTermChange(int64_t term);
   void NotifyObserversOfTermChangeTask(int64_t term);
@@ -373,14 +379,12 @@ class PeerMessageQueue {
 
   // Advances 'watermark' to the smallest op that 'num_peers_required' have.
   void AdvanceQueueWatermark(const char* type,
-                             OpId* watermark,
+                             int64_t* watermark,
                              const OpId& replicated_before,
                              const OpId& replicated_after,
                              int num_peers_required,
                              const TrackedPeer* who_caused);
 
-  void UpdateFollowerCommittedIndexUnlocked(const OpId& committed_index);
-
   std::vector<PeerMessageQueueObserver*> observers_;
 
   // The pool which executes observer notifications.
@@ -412,22 +416,14 @@ class PeerMessageQueue {
 // The interface between RaftConsensus and the PeerMessageQueue.
 class PeerMessageQueueObserver {
  public:
-  // Called by the queue each time the response for a peer is handled with
-  // the resulting majority replicated index.
-  // The consensus implementation decides the commit index based on that
-  // and triggers the apply for pending transactions.
-  // 'committed_index' is set to the id of the last operation considered
-  // committed by consensus.
-  // The implementation is idempotent, i.e. independently of the ordering of
-  // calls to this method only non-triggered applys will be started.
-  virtual void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                        OpId* committed_index) = 0;
-
-  // Notify the Consensus implementation that a follower replied with a term
+  // Notify the observer that the commit index has advanced to 'committed_index'.
+  virtual void NotifyCommitIndex(int64_t committed_index) = 0;
+
+  // Notify the observer that a follower replied with a term
   // higher than that established in the queue.
   virtual void NotifyTermChange(int64_t term) = 0;
 
-  // Notify Consensus that a peer is unable to catch up due to falling behind
+  // Notify the observer that a peer is unable to catch up due to falling behind
   // the leader's log GC threshold.
   virtual void NotifyFailedFollower(const std::string& peer_uuid,
                                     int64_t term,

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
index 64fce69..3ba3d64 100644
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ b/src/kudu/consensus/raft_consensus-test.cc
@@ -68,7 +68,7 @@ class MockQueue : public PeerMessageQueue {
   explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log)
     : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {}
   MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
-  MOCK_METHOD3(SetLeaderMode, void(const OpId& committed_opid,
+  MOCK_METHOD3(SetLeaderMode, void(int64_t committed_opid,
                                    int64_t current_term,
                                    const RaftConfigPB& active_config));
   MOCK_METHOD0(SetNonLeaderMode, void());
@@ -336,90 +336,6 @@ void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
   noop_msg->mutable_noop_request();
 }
 
-// Tests that the committed index moves along with the majority replicated
-// index when the terms are the same.
-TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) {
-  SetUpConsensus();
-  SetUpGeneralExpectations();
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(1)
-      .WillOnce(Return(Status::OK()));
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*queue_, SetLeaderMode(_, _, _))
-      .Times(1);
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(11);
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(11).WillRepeatedly(Return(Status::OK()));
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Commit the first noop round, created on EmulateElection();
-  OpId committed_index;
-  ASSERT_FALSE(rounds_.empty()) << "rounds_ is empty!";
-  consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
-
-  ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
-
-  // Append 10 rounds
-  for (int i = 0; i < 10; i++) {
-    scoped_refptr<ConsensusRound> round = AppendNoOpRound();
-    // queue reports majority replicated index in the leader's term
-    // committed index should move accordingly.
-    consensus_->UpdateMajorityReplicated(round->id(), &committed_index);
-    ASSERT_OPID_EQ(round->id(), committed_index);
-  }
-}
-
-// Tests that, when terms change, the commit index only advances when the majority
-// replicated index is in the current term.
-TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) {
-  SetUpConsensus();
-  SetUpGeneralExpectations();
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(2)
-      .WillRepeatedly(Return(Status::OK()));
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(3);
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(3).WillRepeatedly(Return(Status::OK()));;
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  OpId committed_index;
-  consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
-  ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
-
-  // Append another round in the current term (besides the original config round).
-  scoped_refptr<ConsensusRound> round = AppendNoOpRound();
-
-  // Now emulate an election, the same guy will be leader but the term
-  // will change.
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Now tell consensus that 'round' has been majority replicated, this _shouldn't_
-  // advance the committed index, since that belongs to a previous term.
-  OpId new_committed_index;
-  consensus_->UpdateMajorityReplicated(round->id(), &new_committed_index);
-  ASSERT_OPID_EQ(committed_index, new_committed_index);
-
-  const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2];
-
-  // Now notify that the last change config was committed, this should advance the
-  // commit index to the id of the last change config.
-  consensus_->UpdateMajorityReplicated(last_config_round->id(), &committed_index);
-
-  DumpRounds();
-  ASSERT_OPID_EQ(last_config_round->id(), committed_index);
-}
-
 // Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
 MATCHER(HasOpId, "") { return arg->id().IsInitialized(); }
 
@@ -453,7 +369,7 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
 
   {
     InSequence dummy;
-    // On start we expect 10 NO_OPs to be enqueues, with 5 of those having
+    // On start we expect 10 NO_OPs to be enqueued, with 5 of those having
     // their commit continuation called immediately.
     EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
         .Times(10);
@@ -469,7 +385,6 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
-
   // Now we test what this peer does with the pending operations once it's elected leader.
   {
     InSequence dummy;
@@ -504,25 +419,11 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
   EXPECT_CALL(*queue_, Close())
       .Times(1);
 
-  // Now tell consensus all original orphaned replicates were majority replicated.
-  // This should not advance the committed index because we haven't replicated
-  // anything in the current term.
-  OpId committed_index;
-  consensus_->UpdateMajorityReplicated(info.orphaned_replicates.back()->id(),
-                                       &committed_index);
-  // Should still be the last committed in the the wal.
-  ASSERT_OPID_EQ(committed_index, info.last_committed_id);
-
   // Now mark the last operation (the no-op round) as committed.
   // This should advance the committed index, since that round in on our current term,
   // and we should be able to commit all previous rounds.
-  OpId cc_round_id = info.orphaned_replicates.back()->id();
-  cc_round_id.set_term(11);
-  cc_round_id.set_index(cc_round_id.index() + 1);
-  consensus_->UpdateMajorityReplicated(cc_round_id,
-                                       &committed_index);
-
-  ASSERT_OPID_EQ(committed_index, cc_round_id);
+  int64_t cc_round_index = info.orphaned_replicates.back()->id().index() + 1;
+  consensus_->NotifyCommitIndex(cc_round_index);
 }
 
 MATCHER_P2(RoundHasOpId, term, index, "") {
@@ -619,7 +520,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
     replicate->set_timestamp(clock_->Now().ToUint64());
   }
 
-  request.mutable_committed_index()->CopyFrom(MakeOpId(3, 6));
+  request.set_committed_index(6);
 
   ConsensusResponsePB response;
   ASSERT_OK(consensus_->Update(&request, &response));
@@ -635,7 +536,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
 
   request.mutable_ops()->Clear();
   request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
-  request.mutable_committed_index()->CopyFrom(MakeOpId(3, 9));
+  request.set_committed_index(9);
 
   ASSERT_OK(consensus_->Update(&request, &response));
   ASSERT_FALSE(response.has_error());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 51af20a..6a4195a 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -270,7 +270,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
     state_->ClearLeaderUnlocked();
 
     RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
-                          "Unable to start RAFT ReplicaState");
+                          "Unable to start Raft ReplicaState");
 
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                    << info.orphaned_replicates.size()
@@ -281,8 +281,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
     }
 
-    bool committed_index_changed = false;
-    state_->AdvanceCommittedIndexUnlocked(info.last_committed_id, &committed_index_changed);
+    state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id);
 
     queue_->Init(state_->GetLastReceivedOpIdUnlocked());
   }
@@ -574,38 +573,21 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
   return Status::OK();
 }
 
-void RaftConsensus::UpdateMajorityReplicated(const OpId& majority_replicated,
-                                             OpId* committed_index) {
+void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   ReplicaState::UniqueLock lock;
-  Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock);
+  Status s = state_->LockForCommit(&lock);
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX(WARNING)
         << "Unable to take state lock to update committed index: "
         << s.ToString();
     return;
   }
-  UpdateMajorityReplicatedUnlocked(majority_replicated, committed_index);
-}
 
-void RaftConsensus::UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                                     OpId* committed_index) {
-  VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking majority replicated up to "
-      << majority_replicated.ShortDebugString();
-  TRACE("Marking majority replicated up to $0", majority_replicated.ShortDebugString());
-  bool committed_index_changed = false;
-  Status s = state_->UpdateMajorityReplicatedUnlocked(majority_replicated, committed_index,
-                                                      &committed_index_changed);
-  if (PREDICT_FALSE(!s.ok())) {
-    string msg = Substitute("Unable to mark committed up to $0: $1",
-                            majority_replicated.ShortDebugString(),
-                            s.ToString());
-    TRACE(msg);
-    LOG_WITH_PREFIX_UNLOCKED(WARNING) << msg;
-    return;
-  }
+  state_->AdvanceCommittedIndexUnlocked(commit_index);
 
-  if (committed_index_changed &&
-      state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  // TODO: is this right? the goal is to signal a new request
+  // whenever we have new commit-index to propagate.
+  if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
   }
 }
@@ -614,7 +596,7 @@ void RaftConsensus::NotifyTermChange(int64_t term) {
   ReplicaState::UniqueLock lock;
   Status s = state_->LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for config change"
+    LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for term change"
                  << " when notified of new term " << term << ": " << s.ToString();
     return;
   }
@@ -767,7 +749,7 @@ std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
 
 void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
                                                      LeaderRequest* deduplicated_req) {
-  const OpId& last_committed = state_->GetCommittedOpIdUnlocked();
+  int64_t last_committed_index = state_->GetCommittedIndexUnlocked();
 
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
@@ -781,7 +763,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   for (int i = 0; i < rpc_req->ops_size(); i++) {
     ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
 
-    if (leader_msg->id().index() <= last_committed.index()) {
+    if (leader_msg->id().index() <= last_committed_index) {
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
                                    << " (already committed)";
       deduplicated_req->preceding_opid = &leader_msg->id();
@@ -793,7 +775,9 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
       // pendings set.
       scoped_refptr<ConsensusRound> round =
           state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index());
-      DCHECK(round);
+      DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
+                    << " in pending set. committed= " << last_committed_index
+                    << " dedup=" << dedup_up_to_index;
 
       // If the OpIds match, i.e. if they have the same term and id, then this is just
       // duplicate, we skip...
@@ -894,6 +878,12 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
                                                  ConsensusResponsePB* response,
                                                  LeaderRequest* deduped_req) {
 
+  if (request->has_deprecated_committed_index()) {
+    return Status::InvalidArgument("Leader appears to be running an earlier version "
+                                   "of Kudu. Please shut down and upgrade all servers "
+                                   "before restarting.");
+  }
+
   ConsensusRequestPB* mutable_req = const_cast<ConsensusRequestPB*>(request);
   DeduplicateLeaderRequestUnlocked(mutable_req, deduped_req);
 
@@ -1099,16 +1089,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 1. As many pending transactions as we can, except...
     // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
     // 3. ...the leader's committed index is always our upper bound.
-    OpId early_apply_up_to = state_->GetLastPendingTransactionOpIdUnlocked();
-    CopyIfOpIdLessThan(*deduped_req.preceding_opid, &early_apply_up_to);
-    CopyIfOpIdLessThan(request->committed_index(), &early_apply_up_to);
+    int64_t early_apply_up_to = std::min<int64_t>({
+        state_->GetLastPendingTransactionOpIdUnlocked().index(),
+        deduped_req.preceding_opid->index(),
+        request->committed_index()});
 
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " <<
-        early_apply_up_to.ShortDebugString();
-    TRACE("Early marking committed up to $0",
-          early_apply_up_to.ShortDebugString());
-    bool committed_index_changed = false;
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(early_apply_up_to, &committed_index_changed));
+    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to;
+    TRACE("Early marking committed up to index $0", early_apply_up_to);
+    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(early_apply_up_to));
 
     // 2 - Enqueue the prepares
 
@@ -1202,10 +1190,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // Choose the last operation to be applied. This will either be 'committed_index', if
     // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
     // the last successfully enqueued prepare, if some prepare failed to enqueue.
-    OpId apply_up_to;
-    if (last_from_leader.index() < request->committed_index().index()) {
+    int64_t apply_up_to;
+    if (last_from_leader.index() < request->committed_index()) {
       // we should never apply anything later than what we received in this request
-      apply_up_to = last_from_leader;
+      apply_up_to = last_from_leader.index();
 
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
           << request->committed_index() << " from the leader but only"
@@ -1214,9 +1202,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       apply_up_to = request->committed_index();
     }
 
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to.ShortDebugString();
-    TRACE(Substitute("Marking committed up to $0", apply_up_to.ShortDebugString()));
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to, &committed_index_changed));
+    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
+    TRACE("Marking committed up to $0", apply_up_to);
+    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to));
     queue_->UpdateFollowerCommittedIndex(apply_up_to);
 
     // We can now update the last received watermark.
@@ -1276,8 +1264,9 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons
       state_->GetLastReceivedOpIdUnlocked());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
       state_->GetLastReceivedOpIdCurLeaderUnlocked());
+  // TODO: interrogate queue rather than state?
   response->mutable_status()->set_last_committed_idx(
-      state_->GetCommittedOpIdUnlocked().index());
+      state_->GetCommittedIndexUnlocked());
 }
 
 void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
@@ -1742,7 +1731,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // in the queue -- when the queue is in LEADER mode, it checks that all
   // registered peers are a part of the active config.
   peer_manager_->Close();
-  queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(),
+  queue_->SetLeaderMode(state_->GetCommittedIndexUnlocked(),
                         state_->GetCurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
@@ -1877,7 +1866,8 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked();
   } else if (type == COMMITTED_OPID) {
-    *DCHECK_NOTNULL(id) = state_->GetCommittedOpIdUnlocked();
+    id->set_term(state_->GetTermWithLastCommittedOpUnlocked());
+    id->set_index(state_->GetCommittedIndexUnlocked());
   } else {
     return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
   }
@@ -1986,8 +1976,9 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const {
 MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
   // Compute a backoff factor based on how many leader elections have
   // taken place since a leader was successfully elected.
-  int term_difference = state_->GetCurrentTermUnlocked() -
-    state_->GetCommittedOpIdUnlocked().term();
+  int term_difference =  state_->GetCurrentTermUnlocked() -
+      state_->GetTermWithLastCommittedOpUnlocked();
+
   double backoff_factor = pow(1.1, term_difference);
   double min_timeout = MinimumElectionTimeout().ToMilliseconds();
   double max_timeout = std::min<double>(


[3/4] kudu git commit: Fix compilation error after ce0fcd4dd65a23aebf2601570e335c1b6ce2a0c9

Posted by to...@apache.org.
Fix compilation error after ce0fcd4dd65a23aebf2601570e335c1b6ce2a0c9

This commit had a small semantic conflict with another commit that went
into master before it. This fixes the trivial issue.

Change-Id: I54f714d6f1eb2929784f0a2cf2e18a18c97ac980
Reviewed-on: http://gerrit.cloudera.org:8080/4331
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2876683b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2876683b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2876683b

Branch: refs/heads/master
Commit: 2876683bc9f2798f1acc7d73b1939d7134718578
Parents: ce0fcd4
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Sep 7 18:38:13 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Sep 8 02:03:46 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2876683b/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b7dff82..dd905b8 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -398,10 +398,10 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   // logging this fact periodically.
   if (request->ops_size() > 0) {
     int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
-    if (last_op_sent < request->committed_index().index()) {
+    if (last_op_sent < request->committed_index()) {
       KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer->status_log_throttler, "lagging")
           << LogPrefixUnlocked() << "Peer " << uuid << " is lagging by at least "
-          << (request->committed_index().index() - last_op_sent)
+          << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
     }
   }