You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/02/04 06:33:32 UTC

incubator-kudu git commit: Fix flakiness of client_failover-itest

Repository: incubator-kudu
Updated Branches:
  refs/heads/branch-0.7.0 6e6ffa750 -> 8b3c65068


Fix flakiness of client_failover-itest

The reason this test was flaky is that there is a race between
determining that all of the replicas in a config have replicated an opid
and that opid being committed. We were also missing the
--master_add_server_when_underreplicated=false flag on the test, causing
a race between the master and the test driver.

This patch exposes getting the committed OpId from Consensus as an RPC
(previously, only the last replicated opid was exposed). It also adds a
helper function for testing purposes to wait until a specific OpId has
been committed.

Looped 100x and they all passed:
http://dist-test.cloudera.org/job?job_id=mpercy.1454486819.10566

Change-Id: I56ea7523e212c1b0f67b1498ba5139e4c5537519
Reviewed-on: http://gerrit.cloudera.org:8080/1952
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
(cherry picked from commit 1a24338ad60a8842d1ae5e227f8f03e58faea8c0)
Reviewed-on: http://gerrit.cloudera.org:8080/2032


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/8b3c6506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/8b3c6506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/8b3c6506

Branch: refs/heads/branch-0.7.0
Commit: 8b3c65068aa7ad7de2a379e27e3ea58f31a5765f
Parents: 6e6ffa7
Author: Mike Percy <mp...@apache.org>
Authored: Thu Jan 28 15:01:50 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Feb 4 05:33:00 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus.h                  |  9 ++--
 src/kudu/consensus/consensus.proto              |  9 ++++
 src/kudu/consensus/raft_consensus-test.cc       |  2 +-
 src/kudu/consensus/raft_consensus.cc            | 10 +++-
 src/kudu/consensus/raft_consensus.h             |  2 +-
 .../integration-tests/client_failover-itest.cc  | 14 ++++--
 .../integration-tests/cluster_itest_util.cc     | 48 +++++++++++++++++---
 src/kudu/integration-tests/cluster_itest_util.h | 15 +++++-
 .../integration-tests/raft_consensus-itest.cc   | 41 +++++++++--------
 .../tablet_replacement-itest.cc                 |  2 +-
 src/kudu/integration-tests/ts_itest-base.h      |  3 +-
 src/kudu/tserver/tablet_service.cc              |  8 +++-
 12 files changed, 119 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index db44d4b..4b966f4 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -248,9 +248,12 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
   // Stops running the consensus algorithm.
   virtual void Shutdown() = 0;
 
-  // TEMPORARY: Allows to get the last received OpId by this replica
-  // TODO Remove once we have solid election.
-  virtual Status GetLastReceivedOpId(OpId* id) { return Status::NotFound("Not implemented."); }
+  // Returns the last OpId (either received or committed, depending on the
+  // 'type' argument) that the Consensus implementation knows about.
+  // Primarily used for testing purposes.
+  virtual Status GetLastOpId(OpIdType type, OpId* id) {
+    return Status::NotFound("Not implemented.");
+  }
 
  protected:
   friend class RefCountedThreadSafe<Consensus>;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index cb10bde..517d06b 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -401,12 +401,21 @@ message LeaderStepDownResponsePB {
   optional tserver.TabletServerErrorPB error = 1;
 }
 
+enum OpIdType {
+  UNKNOWN_OPID_TYPE = 0;
+  RECEIVED_OPID = 1;
+  COMMITTED_OPID = 2;
+}
+
 message GetLastOpIdRequestPB {
   // UUID of server this request is addressed to.
   optional bytes dest_uuid = 2;
 
   // the id of the tablet
   required bytes tablet_id = 1;
+
+  // Whether to return the last-received or last-committed OpId.
+  optional OpIdType opid_type = 3 [ default = RECEIVED_OPID ];
 }
 
 message GetLastOpIdResponsePB {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
index 2761844..76e19ad 100644
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ b/src/kudu/consensus/raft_consensus-test.cc
@@ -642,7 +642,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
 TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) {
   SetUpConsensus();
   OpId opid;
-  ASSERT_OK(consensus_->GetLastReceivedOpId(&opid));
+  ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid));
   ASSERT_TRUE(opid.IsInitialized());
   ASSERT_OPID_EQ(opid, MinimumOpId());
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 58909f8..6e15c84 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1839,10 +1839,16 @@ void RaftConsensus::DoElectionCallback(const ElectionResult& result) {
   CHECK_OK(BecomeLeaderUnlocked());
 }
 
-Status RaftConsensus::GetLastReceivedOpId(OpId* id) {
+Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   ReplicaState::UniqueLock lock;
   RETURN_NOT_OK(state_->LockForRead(&lock));
-  DCHECK_NOTNULL(id)->CopyFrom(state_->GetLastReceivedOpIdUnlocked());
+  if (type == RECEIVED_OPID) {
+    *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked();
+  } else if (type == COMMITTED_OPID) {
+    *DCHECK_NOTNULL(id) = state_->GetCommittedOpIdUnlocked();
+  } else {
+    return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 2f84fde..6f5e377 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -160,7 +160,7 @@ class RaftConsensus : public Consensus,
                                     int64_t term,
                                     const std::string& reason) OVERRIDE;
 
-  virtual Status GetLastReceivedOpId(OpId* id) OVERRIDE;
+  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
 
  protected:
   // Trigger that a non-Transaction ConsensusRound has finished replication.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc
index c3e3d0a..535d394 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -49,7 +49,8 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
 
   vector<string> ts_flags = { "--enable_leader_failure_detection=false",
                               "--enable_remote_bootstrap=false" };
-  vector<string> master_flags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" };
+  vector<string> master_flags = { "--master_add_server_when_underreplicated=false",
+                                  "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" };
 
   // Start up with 4 tablet servers.
   NO_FATALS(StartCluster(ts_flags, master_flags, 4));
@@ -118,12 +119,15 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
 
   // We need to elect a new leader to remove the old node.
   ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
-  ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
-                                  workload.batches_completed() + 2));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 2, leader, tablet_id,
+                                          kTimeout));
 
   // Do a config change to remove the old replica and add a new one.
   // Cause the new replica to become leader, then do the scan again.
   ASSERT_OK(RemoveServer(leader, tablet_id, old_leader, boost::none, kTimeout));
+  // Wait until the config is committed, otherwise AddServer() will fail.
+  ASSERT_OK(WaitUntilCommittedConfigOpIdIndexIs(workload.batches_completed() + 3, leader, tablet_id,
+                                                kTimeout));
 
   TServerDetails* to_add = ts_map_[cluster_->tablet_server(missing_replica_index)->uuid()];
   ASSERT_OK(AddServer(leader, tablet_id, to_add, consensus::RaftPeerPB::VOTER,
@@ -142,8 +146,8 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
   leader_index = missing_replica_index;
   leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
   ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
-  ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
-                                  workload.batches_completed() + 5));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 5, leader, tablet_id,
+                                          kTimeout));
 
   ASSERT_EQ(workload.rows_inserted(), CountTableRows(table.get()));
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 5d892e4..f7fecf1 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#include "kudu/integration-tests/cluster_itest_util.h"
 
 #include <algorithm>
 #include <boost/optional.hpp>
@@ -30,7 +31,6 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/server/server_base.proxy.h"
@@ -106,6 +106,8 @@ client::KuduSchema SimpleIntKeyKuduSchema() {
 
 Status GetLastOpIdForEachReplica(const string& tablet_id,
                                  const vector<TServerDetails*>& replicas,
+                                 consensus::OpIdType opid_type,
+                                 const MonoDelta& timeout,
                                  vector<OpId>* op_ids) {
   GetLastOpIdRequestPB opid_req;
   GetLastOpIdResponsePB opid_resp;
@@ -115,10 +117,11 @@ Status GetLastOpIdForEachReplica(const string& tablet_id,
   op_ids->clear();
   for (TServerDetails* ts : replicas) {
     controller.Reset();
-    controller.set_timeout(MonoDelta::FromSeconds(3));
+    controller.set_timeout(timeout);
     opid_resp.Clear();
     opid_req.set_dest_uuid(ts->uuid());
     opid_req.set_tablet_id(tablet_id);
+    opid_req.set_opid_type(opid_type);
     RETURN_NOT_OK_PREPEND(
       ts->consensus_proxy->GetLastOpId(opid_req, &opid_resp, &controller),
       Substitute("Failed to fetch last op id from $0",
@@ -131,11 +134,13 @@ Status GetLastOpIdForEachReplica(const string& tablet_id,
 
 Status GetLastOpIdForReplica(const std::string& tablet_id,
                              TServerDetails* replica,
+                             consensus::OpIdType opid_type,
+                             const MonoDelta& timeout,
                              consensus::OpId* op_id) {
   vector<TServerDetails*> replicas;
   replicas.push_back(replica);
   vector<OpId> op_ids;
-  RETURN_NOT_OK(GetLastOpIdForEachReplica(tablet_id, replicas, &op_ids));
+  RETURN_NOT_OK(GetLastOpIdForEachReplica(tablet_id, replicas, opid_type, timeout, &op_ids));
   CHECK_EQ(1, op_ids.size());
   *op_id = op_ids[0];
   return Status::OK();
@@ -153,7 +158,8 @@ Status WaitForServersToAgree(const MonoDelta& timeout,
     vector<TServerDetails*> servers;
     AppendValuesFromMap(tablet_servers, &servers);
     vector<OpId> ids;
-    Status s = GetLastOpIdForEachReplica(tablet_id, servers, &ids);
+    Status s = GetLastOpIdForEachReplica(tablet_id, servers, consensus::RECEIVED_OPID, timeout,
+                                         &ids);
     if (s.ok()) {
       bool any_behind = false;
       bool any_disagree = false;
@@ -196,7 +202,8 @@ Status WaitUntilAllReplicasHaveOp(const int64_t log_index,
   MonoDelta passed = MonoDelta::FromMilliseconds(0);
   while (true) {
     vector<OpId> op_ids;
-    Status s = GetLastOpIdForEachReplica(tablet_id, replicas, &op_ids);
+    Status s = GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID, timeout,
+                                         &op_ids);
     if (s.ok()) {
       bool any_behind = false;
       for (const OpId& op_id : op_ids) {
@@ -317,7 +324,7 @@ Status WaitUntilCommittedConfigNumVotersIs(int config_size,
                                      cstate.ShortDebugString(), s.ToString()));
 }
 
-Status WaitUntilCommittedConfigOpidIndexIs(int64_t opid_index,
+Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                            const TServerDetails* replica,
                                            const std::string& tablet_id,
                                            const MonoDelta& timeout) {
@@ -337,7 +344,7 @@ Status WaitUntilCommittedConfigOpidIndexIs(int64_t opid_index,
     if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) break;
     SleepFor(MonoDelta::FromMilliseconds(10));
   }
-  return Status::TimedOut(Substitute("Committed consensus opid_index does not equal $0 "
+  return Status::TimedOut(Substitute("Committed config opid_index does not equal $0 "
                                      "after waiting for $1. "
                                      "Last consensus state: $2. Last status: $3",
                                      opid_index,
@@ -345,6 +352,33 @@ Status WaitUntilCommittedConfigOpidIndexIs(int64_t opid_index,
                                      cstate.ShortDebugString(), s.ToString()));
 }
 
+Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
+                                     TServerDetails* replica,
+                                     const string& tablet_id,
+                                     const MonoDelta& timeout) {
+  MonoTime start = MonoTime::Now(MonoTime::FINE);
+  MonoTime deadline = start;
+  deadline.AddDelta(timeout);
+
+  Status s;
+  OpId op_id;
+  while (true) {
+    MonoDelta remaining_timeout = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
+    s = GetLastOpIdForReplica(tablet_id, replica, consensus::COMMITTED_OPID, remaining_timeout,
+                              &op_id);
+    if (s.ok() && op_id.index() == opid_index) {
+      return Status::OK();
+    }
+    if (MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).MoreThan(timeout)) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  return Status::TimedOut(Substitute("Committed consensus opid_index does not equal $0 "
+                                     "after waiting for $1. Last status: $2",
+                                     opid_index,
+                                     MonoTime::Now(MonoTime::FINE).GetDeltaSince(start).ToString(),
+                                     s.ToString()));
+}
+
 Status GetReplicaStatusAndCheckIfLeader(const TServerDetails* replica,
                                         const string& tablet_id,
                                         const MonoDelta& timeout) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index c908dc4..596e4a2 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -35,6 +35,7 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -105,11 +106,15 @@ Status CreateTabletServerMap(master::MasterServiceProxy* master_proxy,
 // Returns a bad Status if any replica cannot be reached.
 Status GetLastOpIdForEachReplica(const std::string& tablet_id,
                                  const std::vector<TServerDetails*>& replicas,
+                                 consensus::OpIdType opid_type,
+                                 const MonoDelta& timeout,
                                  std::vector<consensus::OpId>* op_ids);
 
 // Like the above, but for a single replica.
 Status GetLastOpIdForReplica(const std::string& tablet_id,
                              TServerDetails* replica,
+                             consensus::OpIdType opid_type,
+                             const MonoDelta& timeout,
                              consensus::OpId* op_id);
 
 // Wait until all of the servers have converged on the same log index.
@@ -144,13 +149,19 @@ Status WaitUntilCommittedConfigNumVotersIs(int config_size,
                                            const std::string& tablet_id,
                                            const MonoDelta& timeout);
 
-// Wait until the the opid_index of the committed consensus config on the
+// Wait until the opid_index of the committed consensus config on the
 // specified tablet is 'opid_index'.
-Status WaitUntilCommittedConfigOpidIndexIs(int64_t opid_index,
+Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                            const TServerDetails* replica,
                                            const std::string& tablet_id,
                                            const MonoDelta& timeout);
 
+// Wait until the last commited OpId has index exactly 'opid_index'.
+Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
+                                     TServerDetails* replica,
+                                     const std::string& tablet_id,
+                                     const MonoDelta& timeout);
+
 // Returns:
 // Status::OK() if the replica is alive and leader of the consensus configuration.
 // Status::NotFound() if the replica is not part of the consensus configuration or is dead.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index b49dc24..76c1a56 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -661,10 +661,10 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
 void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid,
                                                         int64_t* orig_term,
                                                         string* fell_behind_uuid) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(10);
   // Wait for all of the replicas to have acknowledged the elected
   // leader and logged the first NO_OP.
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
-                                  tablet_id_, 1));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
 
   // Pause one server. This might be the leader, but pausing it will cause
   // a leader election to happen.
@@ -718,7 +718,8 @@ void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid,
   // before we resume the follower.
   {
     OpId op_id;
-    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, &op_id));
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout,
+                                    &op_id));
     *orig_term = op_id.term();
     LOG(INFO) << "Servers converged with original term " << *orig_term;
   }
@@ -776,7 +777,8 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
     SleepFor(MonoDelta::FromSeconds(5));
     OpId op_id;
     TServerDetails* leader = tablet_servers_[leader_uuid];
-    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, &op_id));
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID,
+                                    MonoDelta::FromSeconds(10), &op_id));
     ASSERT_EQ(orig_term, op_id.term())
       << "expected the leader to have not advanced terms but has op " << op_id;
   }
@@ -1529,6 +1531,7 @@ void RaftConsensusITest::WaitForReplicasReportedToMaster(
 
 // Basic test of adding and removing servers from a configuration.
 TEST_F(RaftConsensusITest, TestAddRemoveServer) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(10);
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_replicas = 3;
   vector<string> ts_flags = { "--enable_leader_failure_detection=false" };
@@ -1543,19 +1546,17 @@ TEST_F(RaftConsensusITest, TestAddRemoveServer) {
   // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
   TServerDetails* leader_tserver = tservers[0];
   const string& leader_uuid = tservers[0]->uuid();
-  ASSERT_OK(StartElection(leader_tserver, tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
+  ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
 
   // Make sure the server rejects removal of itself from the configuration.
-  Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, boost::none,
-                          MonoDelta::FromSeconds(10));
+  Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, boost::none, kTimeout);
   ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: "
                                      << s.ToString();
 
   // Insert the row that we will update throughout the test.
   ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, RowOperationsPB::INSERT,
-                               kTestRowKey, kTestRowIntVal, "initial insert",
-                               MonoDelta::FromSeconds(10)));
+                               kTestRowKey, kTestRowIntVal, "initial insert", kTimeout));
 
   // Kill the master, so we can change the config without interference.
   cluster_->master()->Shutdown();
@@ -1565,7 +1566,8 @@ TEST_F(RaftConsensusITest, TestAddRemoveServer) {
   // Do majority correctness check for 3 servers.
   NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
   OpId opid;
-  ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, &opid));
+  ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
+                                  &opid));
   int64_t cur_log_index = opid.index();
 
   // Go from 3 tablet servers down to 1 in the configuration.
@@ -1576,15 +1578,14 @@ TEST_F(RaftConsensusITest, TestAddRemoveServer) {
 
     TServerDetails* tserver_to_remove = tservers[to_remove_idx];
     LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
-    ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none,
-                           MonoDelta::FromSeconds(10)));
+    ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout));
     ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
-    ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                    active_tablet_servers, tablet_id_, ++cur_log_index));
+    ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
 
     // Do majority correctness check for each incremental decrease.
     NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
-    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, &opid));
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
+                                    &opid));
     cur_log_index = opid.index();
   }
 
@@ -1597,14 +1598,14 @@ TEST_F(RaftConsensusITest, TestAddRemoveServer) {
     TServerDetails* tserver_to_add = tservers[to_add_idx];
     LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
     ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, RaftPeerPB::VOTER, boost::none,
-                        MonoDelta::FromSeconds(10)));
+                        kTimeout));
     InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add);
-    ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                    active_tablet_servers, tablet_id_, ++cur_log_index));
+    ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
 
     // Do majority correctness check for each incremental increase.
     NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
-    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, &opid));
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout,
+                                    &opid));
     cur_log_index = opid.index();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/tablet_replacement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc b/src/kudu/integration-tests/tablet_replacement-itest.cc
index 4cab317..0a1d350 100644
--- a/src/kudu/integration-tests/tablet_replacement-itest.cc
+++ b/src/kudu/integration-tests/tablet_replacement-itest.cc
@@ -209,7 +209,7 @@ TEST_F(TabletReplacementITest, TestEvictAndReplaceDeadFollower) {
   cluster_->tablet_server(kFollowerIndex)->Shutdown();
 
   // With a RemoveServer and AddServer, the opid_index of the committed config will be 3.
-  ASSERT_OK(itest::WaitUntilCommittedConfigOpidIndexIs(3, leader_ts, tablet_id, timeout));
+  ASSERT_OK(itest::WaitUntilCommittedConfigOpIdIndexIs(3, leader_ts, tablet_id, timeout));
   ASSERT_OK(cluster_->tablet_server(kFollowerIndex)->Restart());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index e1c422a..ff99c57 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -324,7 +324,8 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
   int64_t GetFurthestAheadReplicaIdx(const std::string& tablet_id,
                                      const std::vector<TServerDetails*>& replicas) {
     std::vector<OpId> op_ids;
-    CHECK_OK(GetLastOpIdForEachReplica(tablet_id, replicas, &op_ids));
+    CHECK_OK(GetLastOpIdForEachReplica(tablet_id, replicas, consensus::RECEIVED_OPID,
+                                       MonoDelta::FromSeconds(10), &op_ids));
 
     int64 max_index = 0;
     int max_replica_index = -1;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/8b3c6506/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 57f404c..10d8f3b 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -93,6 +93,7 @@ using consensus::Consensus;
 using consensus::ConsensusConfigType;
 using consensus::ConsensusRequestPB;
 using consensus::ConsensusResponsePB;
+using consensus::GetLastOpIdRequestPB;
 using consensus::GetNodeInstanceRequestPB;
 using consensus::GetNodeInstanceResponsePB;
 using consensus::LeaderStepDownRequestPB;
@@ -908,7 +909,12 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
   }
   scoped_refptr<Consensus> consensus;
   if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return;
-  Status s = consensus->GetLastReceivedOpId(resp->mutable_opid());
+  if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
+    HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
+                       resp, context);
+    return;
+  }
+  Status s = consensus->GetLastOpId(req->opid_type(), resp->mutable_opid());
   if (PREDICT_FALSE(!s.ok())) {
     SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::UNKNOWN_ERROR,