You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/08 02:12:22 UTC

[2/4] kudu git commit: Cleanup/refactor tracking of consensus watermarks

Cleanup/refactor tracking of consensus watermarks

This is a fairly invasive cleanup/refactor to consensus in preparation
for propagating replication watermarks from the leader to the followers.

The key item here is to address a long-standing TODO that the
PeerMessageQueue should itself calculate the commit index, rather than
delegate that job to ReplicaState. The flow used to be something like
this on the leader upon receiving a response from a peer that it has
replicated some new operations:

 - PeerMessageQueue::ResponseFromPeer
   - updates peer last_received
   - calls PeerMessageQueue::AdvanceQueueWatermark
     - updates PeerMessageQueue::majority_replicated to the new majority
       watermark
   - calls NotifyObserversOfMajorityReplOp
   --> submits an async task to the raft threadpool
        - for each observer (best I can tell there is currently always
          only one, but that's a separate issue)
        -- observer->UpdateMajorityReplicated() to grab committed index
           (the observer is always the RaftConsensus instance)
        --- RaftConsensus::UpdateMajorityReplicated
        ---- ReplicaState::UpdateMajorityReplicatedUnlocked
        ----- this checks the condition that the majority-replicated
              watermark is within the leader's term before advancing the
              commit index.
        ----- calls AdvanceCommittedIndexUnlocked
        ------- commits stuff, etc
        ----- sets *committed_index out-param
        -- PeerMessageQueue picks up this out-param and sets the new
           value within the PeerMessageQueue

This was very difficult to follow, given that the "observer" in fact was
passing data back to the "notifier" through an out-parameter. Moreover,
it wasn't obvious to see which class was "authoritative" for the
tracking and advancement of watermarks.

The new design makes the PeerMessageQueue itself fully responsible for
calculating when the commit index can advance. The flow is now the
following, with '!' at the beginning of the line to denote where it is
changed from before:

 - PeerMessageQueue::ResponseFromPeer
   - updates peer last_received
   - calls PeerMessageQueue::AdvanceQueueWatermark
     - updates PeerMessageQueue::majority_replicated to the new majority
       watermark.
!  - If the majority-replicated watermark is within the current leader
!    term, advances the commit index (stored in QueueState)
!    - notifies observers of the new commit index
!      - calls observer->NotifyCommitIndex() with the new commit index
!        - RaftConsensus::NotifyCommitIndex()
!          - ReplicaState::AdvanceCommittedIndexUnlocked
             - commits stuff, etc.

This required a fairly invasive surgery because the PeerMessageQueue
itself doesn't remember the terms of pending operations, and thus the
watermarks became indexes instead of OpIds.

This is itself also a simplification -- we previously were very messy
about using the word "index" when in fact we had an OpId type. In almost
every case, we only used the index of those OpIds. In the Raft paper
itself, these watermarks are just indexes and not OpIds, so this change
brings us closer to the implementation described in the original design.

I looped raft-consensus-itest.MultiThreadedInsertWithFailovers 800
times and the whole suite 500 times and got no failures[1].

[1] http://dist-test.cloudera.org//job?job_id=todd.1473280738.28972

Change-Id: I2aa294472f018013d88f36a9358e9ebd9d5ed8f8
Reviewed-on: http://gerrit.cloudera.org:8080/4133
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ce0fcd4d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ce0fcd4d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ce0fcd4d

Branch: refs/heads/master
Commit: ce0fcd4dd65a23aebf2601570e335c1b6ce2a0c9
Parents: b1f1388
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Aug 26 02:15:49 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Sep 8 01:03:45 2016 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc                         |   9 +
 src/kudu/consensus/consensus-test-util.h        |  24 --
 src/kudu/consensus/consensus.proto              |  26 +--
 src/kudu/consensus/consensus_peers-test.cc      |  50 ++--
 src/kudu/consensus/consensus_peers.cc           |   4 +-
 src/kudu/consensus/consensus_queue-test.cc      | 184 ++++++++-------
 src/kudu/consensus/consensus_queue.cc           | 226 ++++++++++++-------
 src/kudu/consensus/consensus_queue.h            |  60 +++--
 src/kudu/consensus/raft_consensus-test.cc       | 111 +--------
 src/kudu/consensus/raft_consensus.cc            |  91 ++++----
 src/kudu/consensus/raft_consensus.h             |  23 +-
 .../consensus/raft_consensus_quorum-test.cc     |  54 ++---
 src/kudu/consensus/raft_consensus_state.cc      | 144 +++++-------
 src/kudu/consensus/raft_consensus_state.h       |  35 ++-
 .../integration-tests/raft_consensus-itest.cc   |  15 +-
 15 files changed, 464 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 281eb81..be89002 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -38,6 +38,15 @@ If you are new to Kudu, check out its list of link:index.html[features and benef
 Kudu 1.0.0 delivers a number of new features, bug fixes, and optimizations,
 detailed below.
 
+Kudu 1.0.0 maintains client-server wire-compatibility with previous releases,
+that applications using the Kudu client libraries may be upgraded either
+before, at the same time, or after the Kudu servers.
+
+Kudu 1.0.0 does _not_ maintain server-server wire compatibility with previous
+releases. Therefore, rolling upgrades between earlier versions of Kudu and
+Kudu 1.0.0 are not supported.
+
+
 [[rn_1.0.0_incompatible_changes]]
 
 - The `kudu-pbc-dump` tool has been removed. The same functionality is now

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index ed3922e..b6fc201 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -841,30 +841,6 @@ class CounterHooks : public Consensus::ConsensusFaultHooks {
   mutable simple_spinlock lock_;
 };
 
-class TestRaftConsensusQueueIface : public PeerMessageQueueObserver {
- public:
-  bool IsMajorityReplicated(int64_t index) {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    return index <= majority_replicated_index_;
-  }
-
- protected:
-  virtual void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                        OpId* committed_index) OVERRIDE {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    majority_replicated_index_ = majority_replicated.index();
-    committed_index->CopyFrom(majority_replicated);
-  }
-  virtual void NotifyTermChange(int64_t term) OVERRIDE {}
-  virtual void NotifyFailedFollower(const std::string& uuid,
-                                    int64_t term,
-                                    const std::string& reason) OVERRIDE {}
-
- private:
-  mutable simple_spinlock lock_;
-  int64_t majority_replicated_index_;
-};
-
 }  // namespace consensus
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 5091c3c..bde2b2a 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -199,25 +199,12 @@ message CommitMsg {
 //  Internal Consensus Messages and State
 // ===========================================================================
 
-// NoOp requests, mostly used in tests.
+// NO_OP requests are replicated by a peer after being elected leader.
 message NoOpRequestPB {
  // Allows to set a dummy payload, for tests.
  optional bytes payload_for_tests = 1;
 }
 
-// NoOp responses, mostly used in tests.
-message NoOpResponsePB {
-   // Allows to set a dummy payload, for tests.
-   optional bytes payload_for_tests = 1;
-}
-
-message PerOpErrorPB {
-  // The id of the operation that failed in the other peer.
-  required OpId id = 1;
-  // The Status explaining why the operation failed.
-  required AppStatusPB status = 2;
-}
-
 // Status message received in the peer responses.
 message ConsensusStatusPB {
 
@@ -330,12 +317,17 @@ message ConsensusRequestPB {
   // This must be set if 'ops' is non-empty.
   optional OpId preceding_id = 4;
 
-  // The id of the last committed operation in the configuration. This is the
-  // id of the last operation the leader deemed committed from a consensus
+  // The index of the last committed operation in the configuration. This is the
+  // index of the last operation the leader deemed committed from a consensus
   // standpoint (not the last operation the leader applied).
   //
   // Raft calls this field 'leaderCommit'.
-  required OpId committed_index = 5;
+  optional int64 committed_index = 8;
+
+  // Deprecated field used in Kudu 0.10.0 and earlier. Remains here to prevent
+  // accidental reuse and provide a nicer error message if the user attempts
+  // a rolling upgrade.
+  optional OpId DEPRECATED_committed_index = 5;
 
   // Sequence of operations to be replicated by this peer.
   // These will be committed when committed_index advances above their

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 116ddea..7159867 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -66,12 +66,10 @@ class ConsensusPeersTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
-    consensus_.reset(new TestRaftConsensusQueueIface());
     message_queue_.reset(new PeerMessageQueue(metric_entity_,
                                               log_.get(),
                                               FakeRaftPeerPB(kLeaderUuid),
                                               kTabletId));
-    message_queue_->RegisterObserver(consensus_.get());
   }
 
   virtual void TearDown() OVERRIDE {
@@ -113,18 +111,13 @@ class ConsensusPeersTest : public KuduTest {
   // Registers a callback triggered when the op with the provided term and index
   // is committed in the test consensus impl.
   // This must be called _before_ the operation is committed.
-  void WaitForMajorityReplicatedIndex(int index) {
-    for (int i = 0; i < 100; i++) {
-      if (consensus_->IsMajorityReplicated(index)) {
-        return;
-      }
-      SleepFor(MonoDelta::FromMilliseconds(i));
-    }
-    FAIL() << "Never replicated index " << index << " on a majority";
+  void WaitForCommitIndex(int index) {
+    AssertEventually([&]() {
+        ASSERT_GE(message_queue_->GetCommittedIndexForTests(), index);
+      });
   }
 
  protected:
-  gscoped_ptr<TestRaftConsensusQueueIface> consensus_;
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   gscoped_ptr<FsManager> fs_manager_;
@@ -146,8 +139,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
   // We use a majority size of 2 since we make one fake remote peer
   // in addition to our real local log.
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   gscoped_ptr<Peer> remote_peer;
@@ -166,7 +159,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
   // now wait on the status of the last operation
   // this will complete once the peer has logged all
   // requests.
-  WaitForMajorityReplicatedIndex(20);
+  WaitForCommitIndex(20);
   // verify that the replicated watermark corresponds to the last replicated
   // message.
   CheckLastRemoteEntry(proxy, 2, 20);
@@ -174,8 +167,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
 
 TEST_F(ConsensusPeersTest, TestRemotePeers) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   // Create a set of remote peers
@@ -201,7 +194,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   // Now wait for the message to be replicated, this should succeed since
   // majority = 2 and only one peer was delayed. The majority is made up
   // of remote-peer1 and the local log.
-  WaitForMajorityReplicatedIndex(first.index());
+  WaitForCommitIndex(first.index());
 
   CheckLastLogEntry(first.term(), first.index());
   CheckLastRemoteEntry(remote_peer1_proxy, first.term(), first.index());
@@ -210,31 +203,31 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   // Wait until all peers have replicated the message, otherwise
   // when we add the next one remote_peer2 might find the next message
   // in the queue and will replicate it, which is not what we want.
-  while (!OpIdEquals(message_queue_->GetAllReplicatedIndexForTests(), first)) {
+  while (message_queue_->GetAllReplicatedIndexForTests() != first.index()) {
     SleepFor(MonoDelta::FromMilliseconds(1));
   }
 
   // Now append another message to the queue
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 2, 1);
 
-  // We should not see it replicated, even after 10ms,
+  // We should not see it committed, even after 10ms,
   // since only the local peer replicates the message.
   SleepFor(MonoDelta::FromMilliseconds(10));
-  ASSERT_FALSE(consensus_->IsMajorityReplicated(2));
+  ASSERT_LT(message_queue_->GetCommittedIndexForTests(), 2);
 
   // Signal one of the two remote peers.
   remote_peer1->SignalRequest();
   // We should now be able to wait for it to replicate, since two peers (a majority)
   // have replicated the message.
-  WaitForMajorityReplicatedIndex(2);
+  WaitForCommitIndex(2);
 }
 
 // Regression test for KUDU-699: even if a peer isn't making progress,
 // and thus always has data pending, we should be able to close the peer.
 TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
@@ -271,8 +264,8 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
 
 TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
   message_queue_->Init(MinimumOpId());
-  message_queue_->SetLeaderMode(MinimumOpId(),
-                                MinimumOpId().term(),
+  message_queue_->SetLeaderMode(kMinimumOpIdIndex,
+                                kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
@@ -294,7 +287,10 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
       MakeOpId(1, 1));
   initial_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom(
       MakeOpId(1, 1));
-  initial_resp.mutable_status()->set_last_committed_idx(0);
+  // We have to set the last_committed_index to 1 to avoid a tight loop
+  // where the peer manager keeps trying to update the peer's committed
+  // index.
+  initial_resp.mutable_status()->set_last_committed_idx(1);
   mock_proxy->set_update_response(initial_resp);
 
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1);
@@ -302,7 +298,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
 
   // Now wait for the message to be replicated, this should succeed since
   // the local (leader) peer always acks and the follower also acked this time.
-  WaitForMajorityReplicatedIndex(1);
+  WaitForCommitIndex(1);
 
   // Set up the peer to respond with an error.
   ConsensusResponsePB error_resp;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 29e8524..6776b50 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -176,11 +176,11 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   // The peer has no pending request nor is sending: send the request.
   bool needs_tablet_copy = false;
   int64_t commit_index_before = request_.has_committed_index() ?
-      request_.committed_index().index() : kMinimumOpIdIndex;
+      request_.committed_index() : kMinimumOpIdIndex;
   Status s = queue_->RequestForPeer(peer_pb_.permanent_uuid(), &request_,
                                     &replicate_msg_refs_, &needs_tablet_copy);
   int64_t commit_index_after = request_.has_committed_index() ?
-      request_.committed_index().index() : kMinimumOpIdIndex;
+      request_.committed_index() : kMinimumOpIdIndex;
 
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Could not obtain request from queue for peer: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 41c539e..33a5f6c 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -69,9 +69,7 @@ class ConsensusQueueTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
-    consensus_.reset(new TestRaftConsensusQueueIface());
     CloseAndReopenQueue();
-    queue_->RegisterObserver(consensus_.get());
   }
 
   void CloseAndReopenQueue() {
@@ -185,7 +183,6 @@ class ConsensusQueueTest : public KuduTest {
   }
 
  protected:
-  gscoped_ptr<TestRaftConsensusQueueIface> consensus_;
   const Schema schema_;
   gscoped_ptr<FsManager> fs_manager_;
   MetricRegistry metric_registry_;
@@ -202,7 +199,7 @@ class ConsensusQueueTest : public KuduTest {
 // falls in the middle of the current messages in the queue.
 TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(2));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   ConsensusRequestPB request;
@@ -242,15 +239,13 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
 // being 'consensus_max_batch_size_bytes'
 TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(2));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
 
   // helper to estimate request size so that we can set the max batch size appropriately
   ConsensusRequestPB page_size_estimator;
   page_size_estimator.set_caller_term(14);
-  OpId* committed_index = page_size_estimator.mutable_committed_index();
-  OpId* preceding_id = page_size_estimator.mutable_preceding_id();
-  committed_index->CopyFrom(MinimumOpId());
-  preceding_id->CopyFrom(MinimumOpId());
+  page_size_estimator.set_committed_index(0);
+  page_size_estimator.mutable_preceding_id()->CopyFrom(MinimumOpId());
 
   // We're going to add 100 messages to the queue so we make each page fetch 9 of those,
   // for a total of 12 pages. The last page should have a single op.
@@ -307,18 +302,16 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
 
 TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   // Wait for the local peer to append all messages
   WaitForLocalPeerToAckIndex(100);
 
-  OpId all_replicated = MakeOpId(14, 100);
-
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
   // Since we're tracking a single peer still this should have moved the all
   // replicated watermark to the last op appended to the local log.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(14, 100));
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 100);
 
   // Start to track the peer after the queue has some messages in it
   // at a point that is halfway through the current messages in the queue.
@@ -333,8 +326,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   ASSERT_TRUE(more_pending);
 
   // Tracking a peer a new peer should have moved the all replicated watermark back.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 0);
 
   vector<ReplicateRefPtr> refs;
   bool needs_tablet_copy;
@@ -350,8 +343,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
   ASSERT_TRUE(more_pending) << "Queue didn't have anymore requests pending";
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(14, 100));
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(14, 100));
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 100);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 100);
 
   // if we ask for a new request, it should come back with the rest of the messages
   ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
@@ -367,8 +360,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 
   WaitForLocalPeerToAckIndex(expected.index());
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected.index());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected.index());
 
   // extract the ops from the request to avoid double free
   request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
@@ -376,22 +369,21 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 
 TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(5));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
   // Track 4 additional peers (in addition to the local peer)
   queue_->TrackPeer("peer-1");
   queue_->TrackPeer("peer-2");
   queue_->TrackPeer("peer-3");
   queue_->TrackPeer("peer-4");
 
-  // Append 10 messages to the queue with a majority of 2 for a total of 3 peers.
+  // Append 10 messages to the queue.
   // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
   WaitForLocalPeerToAckIndex(10);
 
-  // Since only the local log might have ACKed at this point,
+  // Since only the local log has ACKed at this point,
   // the committed_index should be MinimumOpId().
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
   // NOTE: We don't need to get operations from the queue. The queue
   // only cares about what the peer reported as received, not what was sent.
@@ -401,7 +393,7 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   bool more_pending;
   OpId last_sent = MakeOpId(0, 5);
 
-  // Ack the first five operations for peer-1
+  // Ack the first five operations for peer-1.
   response.set_responder_uuid("peer-1");
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
 
@@ -409,40 +401,47 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   ASSERT_TRUE(more_pending);
 
   // Committed index should be the same
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
-  // Ack the first five operations for peer-2
+  // Ack the first five operations for peer-2.
   response.set_responder_uuid("peer-2");
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
   ASSERT_TRUE(more_pending);
 
-  // A majority has now replicated up to 0.5.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(0, 5));
+  // A majority has now replicated up to 0.5: local, 'peer-1', and 'peer-2'.
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
+  // However, this leader has appended operations in term 1, so we can't
+  // advance the committed index yet.
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
+  // Moreover, 'peer-3' and 'peer-4' have not acked yet, so the "all-replicated"
+  // index also cannot advance.
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
-  // Ack all operations for peer-3
+  // Ack all operations for peer-3.
   response.set_responder_uuid("peer-3");
   last_sent = MakeOpId(1, 10);
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
-
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  // The committed index moved so 'more_pending' should be true so that the peer is
-  // notified.
-  ASSERT_TRUE(more_pending);
 
-  // Majority replicated watermark should be the same
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(0, 5));
+  // peer-3 now has all operations, and the commit index hasn't advanced.
+  EXPECT_FALSE(more_pending);
+
+  // Watermarks should remain the same as above: we still have not majority-replicated
+  // anything in the current term, so committed index cannot advance.
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
-  // Ack the remaining operations for peer-4
+  // Ack the remaining operations for peer-4.
   response.set_responder_uuid("peer-4");
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending);
+  EXPECT_TRUE(more_pending);
 
   // Now that a majority of peers have replicated an operation in the queue's
   // term the committed index should advance.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), MakeOpId(1, 10));
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 10);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 10);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 5);
 }
 
 // In this test we append a sequence of operations to a log
@@ -461,26 +460,25 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
   }
   ASSERT_OK(log_->WaitUntilAllFlushed());
 
-  OpId queues_last_op = opid;
-  queues_last_op.set_index(queues_last_op.index() - 1);
+  OpId leader_last_op;
+  log_->GetLatestEntryOpId(&leader_last_op);
 
   // Now reset the queue so that we can pass a new committed index,
   // the last operation in the log.
   CloseAndReopenQueue();
 
-  OpId committed_index;
-  committed_index.set_term(1);
-  committed_index.set_index(100);
-  queue_->Init(committed_index);
-  queue_->SetLeaderMode(committed_index, committed_index.term(), BuildRaftConfigPBForTests(3));
+  queue_->Init(leader_last_op);
+  queue_->SetLeaderMode(leader_last_op.index(),
+                        leader_last_op.term(),
+                        BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
   bool more_pending = false;
 
-  // The peer will actually be behind the first operation in the queue
-  // in this case about 50 operations before.
+  // The peer will actually be behind the first operation in the queue.
+  // In this case about 50 operations before.
   OpId peers_last_op;
   peers_last_op.set_term(1);
   peers_last_op.set_index(50);
@@ -536,13 +534,16 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   }
 
 
-  // Now reset the queue so that we can pass a new committed index,
-  // op, 2.15.
+  // Now reset the queue so that we can pass a new committed index (15).
   CloseAndReopenQueue();
 
-  OpId committed_index = MakeOpId(2, 15);
-  queue_->Init(MakeOpId(2, 20));
-  queue_->SetLeaderMode(committed_index, committed_index.term(), BuildRaftConfigPBForTests(3));
+  OpId last_in_log;
+  log_->GetLatestEntryOpId(&last_in_log);
+  int64_t committed_index = 15;
+  queue_->Init(last_in_log);
+  queue_->SetLeaderMode(committed_index,
+                        last_in_log.term(),
+                        BuildRaftConfigPBForTests(3));
 
   // Now get a request for a simulated old leader, which contains more operations
   // in term 1 than the new leader has.
@@ -563,7 +564,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   ASSERT_FALSE(needs_tablet_copy);
   ASSERT_EQ(request.ops_size(), 0);
   ASSERT_OPID_EQ(request.preceding_id(), MakeOpId(2, 20));
-  ASSERT_OPID_EQ(request.committed_index(), committed_index);
+  ASSERT_EQ(request.committed_index(), committed_index);
 
   // The old leader was still in term 1 but it increased its term with our request.
   response.set_responder_term(2);
@@ -586,7 +587,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
 
   // We're waiting for a two nodes. The all committed watermark should be
   // 0.0 since we haven't had a successful exchange with the 'remote' peer.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
   // Test even when a correct peer responds (meaning we actually get to execute
   // watermark advancement) we sill have the same all-replicated watermark.
@@ -594,7 +595,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   ASSERT_OK(queue_->AppendOperation(make_scoped_refptr(new RefCountedReplicate(replicate))));
   WaitForLocalPeerToAckIndex(21);
 
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 
   // Generate another request for the remote peer, which should include
   // all of the ops since the peer's last-known committed index.
@@ -609,7 +610,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
   // Now the watermark should have advanced.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(2, 21));
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 21);
 
   // The messages still belong to the queue so we have to release them.
   request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
@@ -624,12 +625,12 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
   // Append a bunch of messages.
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
   log_->WaitUntilAllFlushed();
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(1, 10));
+
   // Now rewrite some of the operations and wait for the log to append.
   Synchronizer synch;
   CHECK_OK(queue_->AppendOperations(
-        { make_scoped_refptr(new RefCountedReplicate(
-              CreateDummyReplicate(2, 5, clock_->Now(), 0).release())) },
+      { make_scoped_refptr(new RefCountedReplicate(
+          CreateDummyReplicate(2, 5, clock_->Now(), 0).release())) },
       synch.AsStatusCallback()));
 
   // Wait for the operation to be in the log.
@@ -639,15 +640,16 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
   // in log cache.
   synch.Reset();
   CHECK_OK(queue_->AppendOperations(
-        { make_scoped_refptr(new RefCountedReplicate(
-              CreateDummyReplicate(2, 6, clock_->Now(), 0).release())) },
+      { make_scoped_refptr(new RefCountedReplicate(
+          CreateDummyReplicate(2, 6, clock_->Now(), 0).release())) },
       synch.AsStatusCallback()));
 
   // Wait for the operation to be in the log.
   ASSERT_OK(synch.Wait());
 
-  // Now the all replicated watermark should have moved backward.
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), MakeOpId(2, 6));
+  // The replication watermark on a follower should not advance by virtue of appending
+  // entries to the log.
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), 0);
 }
 
 // Tests that we're advancing the watermarks properly and only when the peer
@@ -678,21 +680,22 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
 TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) {
   FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
 
+  const int kInitialCommittedIndex = 31;
   queue_->Init(MakeOpId(72, 30));
-  queue_->SetLeaderMode(MakeOpId(72, 31), 76, BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kInitialCommittedIndex, 76, BuildRaftConfigPBForTests(3));
 
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   vector<ReplicateRefPtr> refs;
 
   bool more_pending;
-  // We expect the majority replicated watermark to star at the committed index.
-  OpId expected_majority_replicated = MakeOpId(72, 31);
+  // We expect the majority replicated watermark to start at the committed index.
+  int64_t expected_majority_replicated = kInitialCommittedIndex;
   // We expect the all replicated watermark to be reset when we track a new peer.
-  OpId expected_all_replicated = MinimumOpId();
+  int64_t expected_all_replicated = 0;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31, &more_pending);
   ASSERT_TRUE(more_pending);
@@ -727,11 +730,10 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ASSERT_TRUE(more_pending);
 
   // We've sent (and received and ack) up to 72.40 from the remote peer
-  expected_majority_replicated = MakeOpId(72, 40);
-  expected_all_replicated = MakeOpId(72, 40);
+  expected_majority_replicated = expected_all_replicated = 40;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   // Another request for this peer should get another page of messages. Still not
   // on the queue's term (and thus without advancing watermarks).
@@ -746,11 +748,10 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
   // We've now sent (and received an ack) up to 73.39
-  expected_majority_replicated = MakeOpId(73, 49);
-  expected_all_replicated = MakeOpId(73, 49);
+  expected_majority_replicated = expected_all_replicated = 49;
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   // The last page of request should overwrite the peer's operations and the
   // response should finally advance the watermarks.
@@ -761,15 +762,13 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(73, 50));
 
   // We're done, both watermarks should be at the end.
-  expected_majority_replicated = MakeOpId(76, 53);
-  expected_all_replicated = MakeOpId(76, 53);
+  expected_majority_replicated = expected_all_replicated = 53;
 
-  SetLastReceivedAndLastCommitted(&response, expected_majority_replicated,
-                                  expected_majority_replicated, 31);
+  SetLastReceivedAndLastCommitted(&response, MakeOpId(76, 53), 31);
   queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
 
-  ASSERT_OPID_EQ(queue_->GetMajorityReplicatedOpIdForTests(), expected_majority_replicated);
-  ASSERT_OPID_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
+  ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
+  ASSERT_EQ(queue_->GetAllReplicatedIndexForTests(), expected_all_replicated);
 
   request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr);
 }
@@ -777,7 +776,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
 // Test that Tablet Copy is triggered when a "tablet not found" error occurs.
 TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
   queue_->Init(MinimumOpId());
-  queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(3));
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
 
   ConsensusRequestPB request;
@@ -824,13 +823,12 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
 
   // The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex
   // has not been called.
-  queue_->observers_pool_->Wait();
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId());
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 0);
 
   // Update the committed index. In real life, this would be done by the consensus
   // implementation when it receives an updated committed index from the leader.
-  queue_->UpdateFollowerCommittedIndex(MakeOpId(1, 10));
-  ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MakeOpId(1, 10));
+  queue_->UpdateFollowerCommittedIndex(10);
+  ASSERT_EQ(queue_->GetCommittedIndexForTests(), 10);
 
   // Check the metrics have the right values based on the updated committed index.
   ASSERT_EQ(queue_->metrics_.num_majority_done_ops->value(), 0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index a62a7e2..b7dff82 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -17,6 +17,8 @@
 #include "kudu/consensus/consensus_queue.h"
 
 #include <algorithm>
+#include <boost/optional.hpp>
+#include <boost/optional/optional_io.hpp>
 #include <gflags/gflags.h>
 #include <iostream>
 #include <mutex>
@@ -84,7 +86,7 @@ std::string PeerMessageQueue::TrackedPeer::ToString() const {
                     "Last known committed idx: $4, Last exchange result: $5, "
                     "Needs tablet copy: $6",
                     uuid, is_new, OpIdToString(last_received), next_index,
-                    last_known_committed_idx,
+                    last_known_committed_index,
                     is_last_exchange_successful ? "SUCCESS" : "ERROR",
                     needs_tablet_copy);
 }
@@ -107,10 +109,11 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
       metrics_(metric_entity) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
-  queue_state_.current_term = MinimumOpId().term();
-  queue_state_.committed_index = MinimumOpId();
-  queue_state_.all_replicated_opid = MinimumOpId();
-  queue_state_.majority_replicated_opid = MinimumOpId();
+  queue_state_.current_term = 0;
+  queue_state_.first_index_in_current_term = boost::none;
+  queue_state_.committed_index = 0;
+  queue_state_.all_replicated_index = 0;
+  queue_state_.majority_replicated_index = 0;
   queue_state_.state = kQueueConstructed;
   queue_state_.mode = NON_LEADER;
   queue_state_.majority_size_ = -1;
@@ -126,14 +129,18 @@ void PeerMessageQueue::Init(const OpId& last_locally_replicated) {
   TrackPeerUnlocked(local_peer_pb_.permanent_uuid());
 }
 
-void PeerMessageQueue::SetLeaderMode(const OpId& committed_index,
+void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
                                      int64_t current_term,
                                      const RaftConfigPB& active_config) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  CHECK(committed_index.IsInitialized());
-  queue_state_.current_term = current_term;
+  if (current_term != queue_state_.current_term) {
+    CHECK_GT(current_term, queue_state_.current_term) << "Terms should only increase";
+    queue_state_.first_index_in_current_term = boost::none;
+    queue_state_.current_term = current_term;
+  }
+
   queue_state_.committed_index = committed_index;
-  queue_state_.majority_replicated_opid = committed_index;
+  queue_state_.majority_replicated_index = committed_index;
   queue_state_.active_config.reset(new RaftConfigPB(active_config));
   CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config))
       << local_peer_pb_.ShortDebugString() << " not a voter in config: "
@@ -186,8 +193,8 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
   // We don't know how far back this peer is, so set the all replicated watermark to
-  // MinimumOpId. We'll advance it when we know how far along the peer is.
-  queue_state_.all_replicated_opid = MinimumOpId();
+  // 0. We'll advance it when we know how far along the peer is.
+  queue_state_.all_replicated_index = 0;
 }
 
 void PeerMessageQueue::UntrackPeer(const string& uuid) {
@@ -229,7 +236,7 @@ void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
   *fake_response.mutable_status()->mutable_last_received_current_leader() = id;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
-    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index.index());
+    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index);
   }
   bool junk;
   ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response, &junk);
@@ -250,8 +257,22 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
 
   OpId last_id = msgs.back()->get()->id();
 
-  if (last_id.term() > queue_state_.current_term) {
-    queue_state_.current_term = last_id.term();
+  // "Snoop" on the appended operations to watch for term changes (as follower)
+  // and to determine the first index in our term (as leader).
+  //
+  // TODO: it would be a cleaner design to explicitly set the first index in the
+  // leader term as part of SetLeaderMode(). However, we are currently also
+  // using that method to handle refreshing the peer list during configuration
+  // changes, so the refactor isn't trivial.
+  for (const auto& msg : msgs) {
+    const auto& id = msg->get()->id();
+    if (id.term() > queue_state_.current_term) {
+      queue_state_.current_term = id.term();
+      queue_state_.first_index_in_current_term = id.index();
+    } else if (id.term() == queue_state_.current_term &&
+               queue_state_.first_index_in_current_term == boost::none) {
+      queue_state_.first_index_in_current_term = id.index();
+    }
   }
 
   // Unlock ourselves during Append to prevent a deadlock: it's possible that
@@ -293,7 +314,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     // This is initialized to the queue's last appended op but gets set to the id of the
     // log entry preceding the first one in 'messages' if messages are found for the peer.
     preceding_id = queue_state_.last_appended;
-    request->mutable_committed_index()->CopyFrom(queue_state_.committed_index);
+    request->set_committed_index(queue_state_.committed_index);
     request->set_caller_term(queue_state_.current_term);
   }
 
@@ -427,7 +448,7 @@ Status PeerMessageQueue::GetTabletCopyRequestForPeer(const string& uuid,
 }
 
 void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
-                                             OpId* watermark,
+                                             int64_t* watermark,
                                              const OpId& replicated_before,
                                              const OpId& replicated_after,
                                              int num_peers_required,
@@ -437,7 +458,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     VLOG_WITH_PREFIX_UNLOCKED(2) << "Updating " << type << " watermark: "
         << "Peer (" << peer->ToString() << ") changed from "
         << replicated_before << " to " << replicated_after << ". "
-        << "Current value: " << watermark->ShortDebugString();
+                                 << "Current value: " << *watermark;
   }
 
   // Go through the peer's watermarks, we want the highest watermark that
@@ -447,10 +468,30 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
   // - Sort the vector
   // - Find the vector.size() - 'num_peers_required' position, this
   //   will be the new 'watermark'.
-  vector<const OpId*> watermarks;
+  vector<int64_t> watermarks;
   for (const PeersMap::value_type& peer : peers_map_) {
+    // TODO: The fact that we only consider peers whose last exchange was
+    // successful can cause the "all_replicated" watermark to lag behind
+    // farther than necessary. For example:
+    // - local peer has replicated opid 100
+    // - remote peer A has replicated opid 100
+    // - remote peer B has replication opid 10 and is catching up
+    // - remote peer A goes down
+    // Here we'd start getting 'is_last_exchange_successful == false' for peer A.
+    // In that case, the 'all_replicated_watermark', which requires 3 peers, would not
+    // be updateable, even once we've replicated peer 'B' up to opid 100. It would
+    // get "stuck" at 10. In fact, in this case, the 'majority_replicated_watermark' would
+    // also move *backwards* when peer A started getting errors.
+    //
+    // The issue with simply removing this condition is that 'last_received' does not
+    // perfectly correspond to the 'match_index' in Raft Figure 2. It is simply the
+    // highest operation in a peer's log, regardless of whether that peer currently
+    // holds a prefix of the leader's log. So, in the case that the last exchange
+    // was an error (LMP mismatch, for example), the 'last_received' is _not_ usable
+    // for watermark calculation. This could be fixed by separately storing the
+    // 'match_index' on a per-peer basis and using that for watermark calculation.
     if (peer.second->is_last_exchange_successful) {
-      watermarks.push_back(&peer.second->last_received);
+      watermarks.push_back(peer.second->last_received.index());
     }
   }
 
@@ -459,11 +500,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     return;
   }
 
-  std::sort(watermarks.begin(), watermarks.end(), OpIdIndexLessThanPtrFunctor());
+  std::sort(watermarks.begin(), watermarks.end());
 
-  OpId new_watermark = *watermarks[watermarks.size() - num_peers_required];
-  OpId old_watermark = *watermark;
-  watermark->CopyFrom(new_watermark);
+  int64_t new_watermark = watermarks[watermarks.size() - num_peers_required];
+  int64_t old_watermark = *watermark;
+  *watermark = new_watermark;
 
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Updated " << type << " watermark "
       << "from " << old_watermark << " to " << new_watermark;
@@ -473,24 +514,20 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
       VLOG_WITH_PREFIX_UNLOCKED(3) << "Peer: " << peer.second->ToString();
     }
     VLOG_WITH_PREFIX_UNLOCKED(3) << "Sorted watermarks:";
-    for (const OpId* watermark : watermarks) {
-      VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermark: " << watermark->ShortDebugString();
+    for (int64_t watermark : watermarks) {
+      VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermark: " << watermark;
     }
   }
 }
 
-void PeerMessageQueue::UpdateFollowerCommittedIndex(const OpId& committed_index) {
+void PeerMessageQueue::UpdateFollowerCommittedIndex(int64_t committed_index) {
   if (queue_state_.mode == NON_LEADER) {
     std::lock_guard<simple_spinlock> l(queue_lock_);
-    UpdateFollowerCommittedIndexUnlocked(committed_index);
+    queue_state_.committed_index = committed_index;
+    UpdateMetrics();
   }
 }
 
-void PeerMessageQueue::UpdateFollowerCommittedIndexUnlocked(const OpId& committed_index) {
-  queue_state_.committed_index.CopyFrom(committed_index);
-  UpdateMetrics();
-}
-
 void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) {
   std::lock_guard<simple_spinlock> l(queue_lock_);
   TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
@@ -504,7 +541,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
   DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
       << response.InitializationErrorString() << ". Response: " << response.ShortDebugString();
 
-  OpId updated_majority_replicated_opid;
+  boost::optional<int64_t> updated_commit_index;
   Mode mode_copy;
   {
     std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
@@ -555,7 +592,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
 
     // Update the peer status based on the response.
     peer->is_new = false;
-    peer->last_known_committed_idx = status.last_committed_idx();
+    peer->last_known_committed_index = status.last_committed_idx();
     peer->last_successful_communication_time = MonoTime::Now();
 
     // If the reported last-received op for the replica is in our local log,
@@ -584,13 +621,13 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // stepping back one-by-one from the end until we no longer have an LMP
       // error, we jump back to the last committed op indicated by the peer with
       // the hope that doing so will result in a faster catch-up process.
-      DCHECK_GE(peer->last_known_committed_idx, 0);
-      peer->next_index = peer->last_known_committed_idx + 1;
+      DCHECK_GE(peer->last_known_committed_index, 0);
+      peer->next_index = peer->last_known_committed_index + 1;
       LOG_WITH_PREFIX_UNLOCKED(INFO)
           << "Peer " << peer_uuid << " log is divergent from this leader: "
           << "its last log entry " << OpIdToString(status.last_received()) << " is not in "
           << "this leader's log and it has not received anything from this leader yet. "
-          << "Falling back to committed index " << peer->last_known_committed_idx;
+          << "Falling back to committed index " << peer->last_known_committed_index;
     }
 
     if (PREDICT_FALSE(status.has_error())) {
@@ -640,39 +677,72 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           << "Response: " << response.ShortDebugString();
     }
 
-    // If our log has the next request for the peer or if the peer's committed index is
-    // lower than our own, set 'more_pending' to true.
-    *more_pending = log_cache_.HasOpBeenWritten(peer->next_index) ||
-        (peer->last_known_committed_idx < queue_state_.committed_index.index());
-
     mode_copy = queue_state_.mode;
+
+    // If we're the leader, we can compute the new watermarks based on the progress
+    // of our followers.
+    // NOTE: it's possible this node might have lost its leadership (and the notification
+    // is just pending behind the lock we're holding), but any future leader will observe
+    // the same watermarks and make the same advancement, so this is safe.
     if (mode_copy == LEADER) {
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
-                            &queue_state_.majority_replicated_opid,
+                            &queue_state_.majority_replicated_index,
                             previous.last_received,
                             peer->last_received,
                             queue_state_.majority_size_,
                             peer);
 
-      updated_majority_replicated_opid = queue_state_.majority_replicated_opid;
+      // Advance the all replicated index.
+      AdvanceQueueWatermark("all_replicated",
+                            &queue_state_.all_replicated_index,
+                            previous.last_received,
+                            peer->last_received,
+                            peers_map_.size(),
+                            peer);
+
+      // If the majority-replicated index is in our current term,
+      // and it is above our current committed index, then
+      // we can advance the committed index.
+      //
+      // It would seem that the "it is above our current committed index"
+      // check is redundant (and could be a CHECK), but in fact the
+      // majority-replicated index can currently go down, since we don't
+      // consider peers whose last contact was an error in the watermark
+      // calculation. See the TODO in AdvanceQueueWatermark() for more details.
+      int64_t commit_index_before = queue_state_.committed_index;
+      if (queue_state_.first_index_in_current_term != boost::none &&
+          queue_state_.majority_replicated_index >= queue_state_.first_index_in_current_term &&
+          queue_state_.majority_replicated_index > queue_state_.committed_index) {
+        queue_state_.committed_index = queue_state_.majority_replicated_index;
+      } else {
+        VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
+                                     << "first index in current leader term: "
+                                     << queue_state_.first_index_in_current_term;
+      }
+
+      // Only notify observers if the commit index actually changed.
+      if (queue_state_.committed_index != commit_index_before) {
+        DCHECK_GT(queue_state_.committed_index, commit_index_before);
+        updated_commit_index = queue_state_.committed_index;
+        VLOG_WITH_PREFIX_UNLOCKED(2) << "Commit index advanced from "
+                                     << commit_index_before << " to "
+                                     << *updated_commit_index;
+      }
     }
 
-    // Advance the all replicated index.
-    AdvanceQueueWatermark("all_replicated",
-                          &queue_state_.all_replicated_opid,
-                          previous.last_received,
-                          peer->last_received,
-                          peers_map_.size(),
-                          peer);
+    // If our log has the next request for the peer or if the peer's committed index is
+    // lower than our own, set 'more_pending' to true.
+    *more_pending = log_cache_.HasOpBeenWritten(peer->next_index) ||
+        (peer->last_known_committed_index < queue_state_.committed_index);
 
-    log_cache_.EvictThroughOp(queue_state_.all_replicated_opid.index());
+    log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
 
     UpdateMetrics();
   }
 
-  if (mode_copy == LEADER) {
-    NotifyObserversOfMajorityReplOpChange(updated_majority_replicated_opid);
+  if (mode_copy == LEADER && updated_commit_index != boost::none) {
+    NotifyObserversOfCommitIndexChange(*updated_commit_index);
   }
 }
 
@@ -682,19 +752,19 @@ PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uu
   return *tracked;
 }
 
-OpId PeerMessageQueue::GetAllReplicatedIndexForTests() const {
+int64_t PeerMessageQueue::GetAllReplicatedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  return queue_state_.all_replicated_opid;
+  return queue_state_.all_replicated_index;
 }
 
-OpId PeerMessageQueue::GetCommittedIndexForTests() const {
+int64_t PeerMessageQueue::GetCommittedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
   return queue_state_.committed_index;
 }
 
-OpId PeerMessageQueue::GetMajorityReplicatedOpIdForTests() const {
+int64_t PeerMessageQueue::GetMajorityReplicatedIndexForTests() const {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  return queue_state_.majority_replicated_opid;
+  return queue_state_.majority_replicated_index;
 }
 
 
@@ -704,10 +774,10 @@ void PeerMessageQueue::UpdateMetrics() {
   // For non-leaders, majority_done_ops isn't meaningful because followers don't
   // track when an op is replicated to all peers.
   metrics_.num_majority_done_ops->set_value(queue_state_.mode == LEADER ?
-    queue_state_.committed_index.index() - queue_state_.all_replicated_opid.index()
+    queue_state_.committed_index - queue_state_.all_replicated_index
     : 0);
   metrics_.num_in_progress_ops->set_value(
-    queue_state_.last_appended.index() - queue_state_.committed_index.index());
+    queue_state_.last_appended.index() - queue_state_.committed_index);
 }
 
 void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
@@ -802,13 +872,12 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
   return false; // Unreachable; here to squelch GCC warning.
 }
 
-void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange(
-    const OpId new_majority_replicated_op) {
+void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t commit_index) {
   WARN_NOT_OK(observers_pool_->SubmitClosure(
-      Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask,
-           Unretained(this), new_majority_replicated_op)),
+      Bind(&PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask,
+           Unretained(this), commit_index)),
               LogPrefixUnlocked() + "Unable to notify RaftConsensus of "
-                                    "majority replicated op change.");
+                                    "commit index change.");
 }
 
 void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
@@ -818,27 +887,14 @@ void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
               LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change.");
 }
 
-void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask(
-    const OpId new_majority_replicated_op) {
+void PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index) {
   std::vector<PeerMessageQueueObserver*> copy;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     copy = observers_;
   }
-
-  // TODO move commit index advancement here so that the queue is not dependent on
-  // consensus at all, but that requires a bit more work.
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : copy) {
-    observer->UpdateMajorityReplicated(new_majority_replicated_op, &new_committed_index);
-  }
-
-  {
-    std::lock_guard<simple_spinlock> lock(queue_lock_);
-    if (new_committed_index.IsInitialized() &&
-        new_committed_index.index() > queue_state_.committed_index.index()) {
-      queue_state_.committed_index.CopyFrom(new_committed_index);
-    }
+    observer->NotifyCommitIndex(new_commit_index);
   }
 }
 
@@ -849,7 +905,6 @@ void PeerMessageQueue::NotifyObserversOfTermChangeTask(int64_t term) {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     copy = observers_;
   }
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : copy) {
     observer->NotifyTermChange(term);
   }
@@ -873,7 +928,6 @@ void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string& uuid,
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     observers_copy = observers_;
   }
-  OpId new_committed_index;
   for (PeerMessageQueueObserver* observer : observers_copy) {
     observer->NotifyFailedFollower(uuid, term, reason);
   }
@@ -895,11 +949,11 @@ string PeerMessageQueue::LogPrefixUnlocked() const {
 }
 
 string PeerMessageQueue::QueueState::ToString() const {
-  return Substitute("All replicated op: $0, Majority replicated op: $1, "
+  return Substitute("All replicated index: $0, Majority replicated index: $1, "
       "Committed index: $2, Last appended: $3, Current term: $4, Majority size: $5, "
       "State: $6, Mode: $7$8",
-      OpIdToString(all_replicated_opid), OpIdToString(majority_replicated_opid),
-      OpIdToString(committed_index), OpIdToString(last_appended), current_term,
+      all_replicated_index, majority_replicated_index,
+      committed_index, OpIdToString(last_appended), current_term,
       majority_size_, state, (mode == LEADER ? "LEADER" : "NON_LEADER"),
       active_config ? ", active raft config: " + active_config->ShortDebugString() : "");
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 6dbb477..59a570e 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_CONSENSUS_CONSENSUS_QUEUE_H_
 #define KUDU_CONSENSUS_CONSENSUS_QUEUE_H_
 
+#include <boost/optional.hpp>
 #include <iosfwd>
 #include <map>
 #include <string>
@@ -72,7 +73,7 @@ class PeerMessageQueue {
           is_new(true),
           next_index(kInvalidOpIdIndex),
           last_received(MinimumOpId()),
-          last_known_committed_idx(MinimumOpId().index()),
+          last_known_committed_index(MinimumOpId().index()),
           is_last_exchange_successful(false),
           last_successful_communication_time(MonoTime::Now()),
           needs_tablet_copy(false),
@@ -102,7 +103,7 @@ class PeerMessageQueue {
     OpId last_received;
 
     // The last committed index this peer knows about.
-    int64_t last_known_committed_idx;
+    int64_t last_known_committed_index;
 
     // Whether the last exchange with this peer was successful.
     bool is_last_exchange_successful;
@@ -138,12 +139,13 @@ class PeerMessageQueue {
   // operations and notifies observers when those change.
   // 'committed_index' corresponds to the id of the last committed operation,
   // i.e. operations with ids <= 'committed_index' should be considered committed.
+  //
   // 'current_term' corresponds to the leader's current term, this is different
   // from 'committed_index.term()' if the leader has not yet committed an
   // operation in the current term.
   // 'active_config' is the currently-active Raft config. This must always be
   // a superset of the tracked peers, and that is enforced with runtime CHECKs.
-  virtual void SetLeaderMode(const OpId& committed_index,
+  virtual void SetLeaderMode(int64_t committed_index,
                              int64_t current_term,
                              const RaftConfigPB& active_config);
 
@@ -219,7 +221,7 @@ class PeerMessageQueue {
 
   // Called by the consensus implementation to update the follower queue's
   // committed index, which is mostly used for metrics.
-  void UpdateFollowerCommittedIndex(const OpId& committed_index);
+  void UpdateFollowerCommittedIndex(int64_t committed_index);
 
   // Closes the queue, peers are still allowed to call UntrackPeer() and
   // ResponseFromPeer() but no additional peers can be tracked or messages
@@ -229,13 +231,14 @@ class PeerMessageQueue {
   virtual int64_t GetQueuedOperationsSizeBytesForTests() const;
 
   // Returns the last message replicated by all peers, for tests.
-  virtual OpId GetAllReplicatedIndexForTests() const;
-
+  virtual int64_t GetAllReplicatedIndexForTests() const;
 
-  virtual OpId GetCommittedIndexForTests() const;
+  // Returns the committed index. All operations with index less than or equal to
+  // this index have been committed.
+  virtual int64_t GetCommittedIndexForTests() const;
 
-  // Returns the current majority replicated OpId, for tests.
-  virtual OpId GetMajorityReplicatedOpIdForTests() const;
+  // Returns the current majority replicated index, for tests.
+  virtual int64_t GetMajorityReplicatedIndexForTests() const;
 
   // Returns a copy of the TrackedPeer with 'uuid' or crashes if the peer is
   // not being tracked.
@@ -288,15 +291,15 @@ class PeerMessageQueue {
 
     // The first operation that has been replicated to all currently
     // tracked peers.
-    OpId all_replicated_opid;
+    int64_t all_replicated_index;
 
     // The index of the last operation replicated to a majority.
     // This is usually the same as 'committed_index' but might not
     // be if the terms changed.
-    OpId majority_replicated_opid;
+    int64_t majority_replicated_index;
 
     // The index of the last operation to be considered committed.
-    OpId committed_index;
+    int64_t committed_index;
 
     // The opid of the last operation appended to the queue.
     OpId last_appended;
@@ -305,10 +308,13 @@ class PeerMessageQueue {
     // Set by the last appended operation.
     // If the queue owner's term is less than the term observed
     // from another peer the queue owner must step down.
-    // TODO: it is likely to be cleaner to get this from the ConsensusMetadata
-    // rather than by snooping on what operations are appended to the queue.
     int64_t current_term;
 
+    // The first index that we saw that was part of this current term.
+    // When the term advances, this is set to boost::none, and then set
+    // when the first operation is appended in the new term.
+    boost::optional<int64_t> first_index_in_current_term;
+
     // The size of the majority for the queue.
     int majority_size_;
 
@@ -329,8 +335,8 @@ class PeerMessageQueue {
   // fatal error.
   bool IsOpInLog(const OpId& desired_op) const;
 
-  void NotifyObserversOfMajorityReplOpChange(const OpId new_majority_replicated_op);
-  void NotifyObserversOfMajorityReplOpChangeTask(const OpId new_majority_replicated_op);
+  void NotifyObserversOfCommitIndexChange(int64_t new_commit_index);
+  void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index);
 
   void NotifyObserversOfTermChange(int64_t term);
   void NotifyObserversOfTermChangeTask(int64_t term);
@@ -373,14 +379,12 @@ class PeerMessageQueue {
 
   // Advances 'watermark' to the smallest op that 'num_peers_required' have.
   void AdvanceQueueWatermark(const char* type,
-                             OpId* watermark,
+                             int64_t* watermark,
                              const OpId& replicated_before,
                              const OpId& replicated_after,
                              int num_peers_required,
                              const TrackedPeer* who_caused);
 
-  void UpdateFollowerCommittedIndexUnlocked(const OpId& committed_index);
-
   std::vector<PeerMessageQueueObserver*> observers_;
 
   // The pool which executes observer notifications.
@@ -412,22 +416,14 @@ class PeerMessageQueue {
 // The interface between RaftConsensus and the PeerMessageQueue.
 class PeerMessageQueueObserver {
  public:
-  // Called by the queue each time the response for a peer is handled with
-  // the resulting majority replicated index.
-  // The consensus implementation decides the commit index based on that
-  // and triggers the apply for pending transactions.
-  // 'committed_index' is set to the id of the last operation considered
-  // committed by consensus.
-  // The implementation is idempotent, i.e. independently of the ordering of
-  // calls to this method only non-triggered applys will be started.
-  virtual void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                        OpId* committed_index) = 0;
-
-  // Notify the Consensus implementation that a follower replied with a term
+  // Notify the observer that the commit index has advanced to 'committed_index'.
+  virtual void NotifyCommitIndex(int64_t committed_index) = 0;
+
+  // Notify the observer that a follower replied with a term
   // higher than that established in the queue.
   virtual void NotifyTermChange(int64_t term) = 0;
 
-  // Notify Consensus that a peer is unable to catch up due to falling behind
+  // Notify the observer that a peer is unable to catch up due to falling behind
   // the leader's log GC threshold.
   virtual void NotifyFailedFollower(const std::string& peer_uuid,
                                     int64_t term,

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
index 64fce69..3ba3d64 100644
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ b/src/kudu/consensus/raft_consensus-test.cc
@@ -68,7 +68,7 @@ class MockQueue : public PeerMessageQueue {
   explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log)
     : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {}
   MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
-  MOCK_METHOD3(SetLeaderMode, void(const OpId& committed_opid,
+  MOCK_METHOD3(SetLeaderMode, void(int64_t committed_opid,
                                    int64_t current_term,
                                    const RaftConfigPB& active_config));
   MOCK_METHOD0(SetNonLeaderMode, void());
@@ -336,90 +336,6 @@ void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
   noop_msg->mutable_noop_request();
 }
 
-// Tests that the committed index moves along with the majority replicated
-// index when the terms are the same.
-TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) {
-  SetUpConsensus();
-  SetUpGeneralExpectations();
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(1)
-      .WillOnce(Return(Status::OK()));
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*queue_, SetLeaderMode(_, _, _))
-      .Times(1);
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(11);
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(11).WillRepeatedly(Return(Status::OK()));
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Commit the first noop round, created on EmulateElection();
-  OpId committed_index;
-  ASSERT_FALSE(rounds_.empty()) << "rounds_ is empty!";
-  consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
-
-  ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
-
-  // Append 10 rounds
-  for (int i = 0; i < 10; i++) {
-    scoped_refptr<ConsensusRound> round = AppendNoOpRound();
-    // queue reports majority replicated index in the leader's term
-    // committed index should move accordingly.
-    consensus_->UpdateMajorityReplicated(round->id(), &committed_index);
-    ASSERT_OPID_EQ(round->id(), committed_index);
-  }
-}
-
-// Tests that, when terms change, the commit index only advances when the majority
-// replicated index is in the current term.
-TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) {
-  SetUpConsensus();
-  SetUpGeneralExpectations();
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(2)
-      .WillRepeatedly(Return(Status::OK()));
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(3);
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(3).WillRepeatedly(Return(Status::OK()));;
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  OpId committed_index;
-  consensus_->UpdateMajorityReplicated(rounds_[0]->id(), &committed_index);
-  ASSERT_OPID_EQ(rounds_[0]->id(), committed_index);
-
-  // Append another round in the current term (besides the original config round).
-  scoped_refptr<ConsensusRound> round = AppendNoOpRound();
-
-  // Now emulate an election, the same guy will be leader but the term
-  // will change.
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Now tell consensus that 'round' has been majority replicated, this _shouldn't_
-  // advance the committed index, since that belongs to a previous term.
-  OpId new_committed_index;
-  consensus_->UpdateMajorityReplicated(round->id(), &new_committed_index);
-  ASSERT_OPID_EQ(committed_index, new_committed_index);
-
-  const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2];
-
-  // Now notify that the last change config was committed, this should advance the
-  // commit index to the id of the last change config.
-  consensus_->UpdateMajorityReplicated(last_config_round->id(), &committed_index);
-
-  DumpRounds();
-  ASSERT_OPID_EQ(last_config_round->id(), committed_index);
-}
-
 // Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
 MATCHER(HasOpId, "") { return arg->id().IsInitialized(); }
 
@@ -453,7 +369,7 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
 
   {
     InSequence dummy;
-    // On start we expect 10 NO_OPs to be enqueues, with 5 of those having
+    // On start we expect 10 NO_OPs to be enqueued, with 5 of those having
     // their commit continuation called immediately.
     EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
         .Times(10);
@@ -469,7 +385,6 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
   ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
-
   // Now we test what this peer does with the pending operations once it's elected leader.
   {
     InSequence dummy;
@@ -504,25 +419,11 @@ TEST_F(RaftConsensusTest, TestPendingTransactions) {
   EXPECT_CALL(*queue_, Close())
       .Times(1);
 
-  // Now tell consensus all original orphaned replicates were majority replicated.
-  // This should not advance the committed index because we haven't replicated
-  // anything in the current term.
-  OpId committed_index;
-  consensus_->UpdateMajorityReplicated(info.orphaned_replicates.back()->id(),
-                                       &committed_index);
-  // Should still be the last committed in the the wal.
-  ASSERT_OPID_EQ(committed_index, info.last_committed_id);
-
   // Now mark the last operation (the no-op round) as committed.
   // This should advance the committed index, since that round in on our current term,
   // and we should be able to commit all previous rounds.
-  OpId cc_round_id = info.orphaned_replicates.back()->id();
-  cc_round_id.set_term(11);
-  cc_round_id.set_index(cc_round_id.index() + 1);
-  consensus_->UpdateMajorityReplicated(cc_round_id,
-                                       &committed_index);
-
-  ASSERT_OPID_EQ(committed_index, cc_round_id);
+  int64_t cc_round_index = info.orphaned_replicates.back()->id().index() + 1;
+  consensus_->NotifyCommitIndex(cc_round_index);
 }
 
 MATCHER_P2(RoundHasOpId, term, index, "") {
@@ -619,7 +520,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
     replicate->set_timestamp(clock_->Now().ToUint64());
   }
 
-  request.mutable_committed_index()->CopyFrom(MakeOpId(3, 6));
+  request.set_committed_index(6);
 
   ConsensusResponsePB response;
   ASSERT_OK(consensus_->Update(&request, &response));
@@ -635,7 +536,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
 
   request.mutable_ops()->Clear();
   request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
-  request.mutable_committed_index()->CopyFrom(MakeOpId(3, 9));
+  request.set_committed_index(9);
 
   ASSERT_OK(consensus_->Update(&request, &response));
   ASSERT_FALSE(response.has_error());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 51af20a..6a4195a 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -270,7 +270,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
     state_->ClearLeaderUnlocked();
 
     RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
-                          "Unable to start RAFT ReplicaState");
+                          "Unable to start Raft ReplicaState");
 
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                    << info.orphaned_replicates.size()
@@ -281,8 +281,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
     }
 
-    bool committed_index_changed = false;
-    state_->AdvanceCommittedIndexUnlocked(info.last_committed_id, &committed_index_changed);
+    state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id);
 
     queue_->Init(state_->GetLastReceivedOpIdUnlocked());
   }
@@ -574,38 +573,21 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
   return Status::OK();
 }
 
-void RaftConsensus::UpdateMajorityReplicated(const OpId& majority_replicated,
-                                             OpId* committed_index) {
+void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   ReplicaState::UniqueLock lock;
-  Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock);
+  Status s = state_->LockForCommit(&lock);
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX(WARNING)
         << "Unable to take state lock to update committed index: "
         << s.ToString();
     return;
   }
-  UpdateMajorityReplicatedUnlocked(majority_replicated, committed_index);
-}
 
-void RaftConsensus::UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                                     OpId* committed_index) {
-  VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking majority replicated up to "
-      << majority_replicated.ShortDebugString();
-  TRACE("Marking majority replicated up to $0", majority_replicated.ShortDebugString());
-  bool committed_index_changed = false;
-  Status s = state_->UpdateMajorityReplicatedUnlocked(majority_replicated, committed_index,
-                                                      &committed_index_changed);
-  if (PREDICT_FALSE(!s.ok())) {
-    string msg = Substitute("Unable to mark committed up to $0: $1",
-                            majority_replicated.ShortDebugString(),
-                            s.ToString());
-    TRACE(msg);
-    LOG_WITH_PREFIX_UNLOCKED(WARNING) << msg;
-    return;
-  }
+  state_->AdvanceCommittedIndexUnlocked(commit_index);
 
-  if (committed_index_changed &&
-      state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  // TODO: is this right? the goal is to signal a new request
+  // whenever we have new commit-index to propagate.
+  if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
   }
 }
@@ -614,7 +596,7 @@ void RaftConsensus::NotifyTermChange(int64_t term) {
   ReplicaState::UniqueLock lock;
   Status s = state_->LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for config change"
+    LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for term change"
                  << " when notified of new term " << term << ": " << s.ToString();
     return;
   }
@@ -767,7 +749,7 @@ std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
 
 void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
                                                      LeaderRequest* deduplicated_req) {
-  const OpId& last_committed = state_->GetCommittedOpIdUnlocked();
+  int64_t last_committed_index = state_->GetCommittedIndexUnlocked();
 
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
@@ -781,7 +763,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   for (int i = 0; i < rpc_req->ops_size(); i++) {
     ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
 
-    if (leader_msg->id().index() <= last_committed.index()) {
+    if (leader_msg->id().index() <= last_committed_index) {
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
                                    << " (already committed)";
       deduplicated_req->preceding_opid = &leader_msg->id();
@@ -793,7 +775,9 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
       // pendings set.
       scoped_refptr<ConsensusRound> round =
           state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index());
-      DCHECK(round);
+      DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
+                    << " in pending set. committed= " << last_committed_index
+                    << " dedup=" << dedup_up_to_index;
 
       // If the OpIds match, i.e. if they have the same term and id, then this is just
       // duplicate, we skip...
@@ -894,6 +878,12 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
                                                  ConsensusResponsePB* response,
                                                  LeaderRequest* deduped_req) {
 
+  if (request->has_deprecated_committed_index()) {
+    return Status::InvalidArgument("Leader appears to be running an earlier version "
+                                   "of Kudu. Please shut down and upgrade all servers "
+                                   "before restarting.");
+  }
+
   ConsensusRequestPB* mutable_req = const_cast<ConsensusRequestPB*>(request);
   DeduplicateLeaderRequestUnlocked(mutable_req, deduped_req);
 
@@ -1099,16 +1089,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 1. As many pending transactions as we can, except...
     // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
     // 3. ...the leader's committed index is always our upper bound.
-    OpId early_apply_up_to = state_->GetLastPendingTransactionOpIdUnlocked();
-    CopyIfOpIdLessThan(*deduped_req.preceding_opid, &early_apply_up_to);
-    CopyIfOpIdLessThan(request->committed_index(), &early_apply_up_to);
+    int64_t early_apply_up_to = std::min<int64_t>({
+        state_->GetLastPendingTransactionOpIdUnlocked().index(),
+        deduped_req.preceding_opid->index(),
+        request->committed_index()});
 
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " <<
-        early_apply_up_to.ShortDebugString();
-    TRACE("Early marking committed up to $0",
-          early_apply_up_to.ShortDebugString());
-    bool committed_index_changed = false;
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(early_apply_up_to, &committed_index_changed));
+    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to;
+    TRACE("Early marking committed up to index $0", early_apply_up_to);
+    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(early_apply_up_to));
 
     // 2 - Enqueue the prepares
 
@@ -1202,10 +1190,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // Choose the last operation to be applied. This will either be 'committed_index', if
     // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
     // the last successfully enqueued prepare, if some prepare failed to enqueue.
-    OpId apply_up_to;
-    if (last_from_leader.index() < request->committed_index().index()) {
+    int64_t apply_up_to;
+    if (last_from_leader.index() < request->committed_index()) {
       // we should never apply anything later than what we received in this request
-      apply_up_to = last_from_leader;
+      apply_up_to = last_from_leader.index();
 
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Received commit index "
           << request->committed_index() << " from the leader but only"
@@ -1214,9 +1202,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       apply_up_to = request->committed_index();
     }
 
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to.ShortDebugString();
-    TRACE(Substitute("Marking committed up to $0", apply_up_to.ShortDebugString()));
-    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to, &committed_index_changed));
+    VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
+    TRACE("Marking committed up to $0", apply_up_to);
+    CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to));
     queue_->UpdateFollowerCommittedIndex(apply_up_to);
 
     // We can now update the last received watermark.
@@ -1276,8 +1264,9 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons
       state_->GetLastReceivedOpIdUnlocked());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
       state_->GetLastReceivedOpIdCurLeaderUnlocked());
+  // TODO: interrogate queue rather than state?
   response->mutable_status()->set_last_committed_idx(
-      state_->GetCommittedOpIdUnlocked().index());
+      state_->GetCommittedIndexUnlocked());
 }
 
 void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
@@ -1742,7 +1731,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // in the queue -- when the queue is in LEADER mode, it checks that all
   // registered peers are a part of the active config.
   peer_manager_->Close();
-  queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(),
+  queue_->SetLeaderMode(state_->GetCommittedIndexUnlocked(),
                         state_->GetCurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
@@ -1877,7 +1866,8 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked();
   } else if (type == COMMITTED_OPID) {
-    *DCHECK_NOTNULL(id) = state_->GetCommittedOpIdUnlocked();
+    id->set_term(state_->GetTermWithLastCommittedOpUnlocked());
+    id->set_index(state_->GetCommittedIndexUnlocked());
   } else {
     return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
   }
@@ -1986,8 +1976,9 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const {
 MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
   // Compute a backoff factor based on how many leader elections have
   // taken place since a leader was successfully elected.
-  int term_difference = state_->GetCurrentTermUnlocked() -
-    state_->GetCommittedOpIdUnlocked().term();
+  int term_difference =  state_->GetCurrentTermUnlocked() -
+      state_->GetTermWithLastCommittedOpUnlocked();
+
   double backoff_factor = pow(1.1, term_difference);
   double min_timeout = MinimumElectionTimeout().ToMilliseconds();
   double max_timeout = std::min<double>(