You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/10/03 21:50:36 UTC

[2/8] kudu git commit: consensus: properly truncate all state when aborting operations

consensus: properly truncate all state when aborting operations

This fixes a consensus bug which was causing exactly_once_writes-itest
to be slightly flaky. The issue was the following sequence:

- a node A is a follower, and has some operations appended (eg 10.5 through
  10.7)
- a node B is elected for term 11, and sends node 'A' a status-only request
  containing preceding_op_id=11.6
-- node 'A' aborts operations 10.6 and 10.7
-- HOWEVER: it was not explicitly removing these operations from the
   LogCache or the Log. Removal was only happening on an actual
   operation _replacement_.
- node 'B' loses its leadership before it is able to replicate anything
  to a majority
- node 'A' gets elected for term 12
-- it calls Queue::SetLeaderMode()
-- this triggers the first requests to be sent to the peer
-- we hit a race where the first request is being constructed _before_
   the leader appends its initial NO_OP to the queue
--- because we never truncated the log cache or queue, we see operations
    10.6 and 10.7 in the queue, and send them to a follower
-- we now append the NO_OP 12.6 which replaces the aborted 10.6.

In this case, the peer who received the fauly request from the leader
may end up committing those operations whereas the rest of the nodes
commit operations from term 12.

The fix in this patch is to explicitly truncate the queue and the
LogCache state when we are aborting operations. WIP because it needs a
few more comments.

To test, I looped exactly_once_writes-itest --gtest_filter=\*Churny\*
1000 times before and after.

Without the patch[1], I got 17 failures, 16 of which were verification
errors that one of the committed op terms did not match.

With the patch[2], I got 5 failures, all of which were checksum
errors while verifying the logs. Since seeing those failures, I fixed
the verifier to run only after shutting down the cluster.

[1] http://dist-test.cloudera.org/job?job_id=todd.1473812577.12216
[2] http://dist-test.cloudera.org/job?job_id=todd.1473811112.9830

Change-Id: I2fb95b447991b7cadc2c403bc2596fead0bd31ad
Reviewed-on: http://gerrit.cloudera.org:8080/4409
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
(cherry picked from commit 1eb24183a540f4e3bbbc8a399e440ecf905f6129)
Reviewed-on: http://gerrit.cloudera.org:8080/4602
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c4d3fb6c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c4d3fb6c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c4d3fb6c

Branch: refs/heads/branch-1.0.x
Commit: c4d3fb6ca0f22de18e8589fb55acd90e9a1ad336
Parents: 908f020
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Sep 13 15:35:49 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Mon Oct 3 21:39:04 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc           |  10 ++
 src/kudu/consensus/consensus_queue.h            |   4 +
 src/kudu/consensus/log_cache-test.cc            |  40 +++++
 src/kudu/consensus/log_cache.cc                 |  33 ++--
 src/kudu/consensus/log_cache.h                  |  12 ++
 src/kudu/consensus/raft_consensus-test.cc       |   6 +
 src/kudu/consensus/raft_consensus.cc            |  12 +-
 src/kudu/consensus/raft_consensus.h             |   4 +
 src/kudu/consensus/raft_consensus_state.cc      |  14 +-
 src/kudu/consensus/raft_consensus_state.h       |   2 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 src/kudu/integration-tests/cluster_verifier.cc  |  10 ++
 .../exactly_once_writes-itest.cc                |   8 +
 src/kudu/integration-tests/log_verifier.cc      | 157 +++++++++++++++++++
 src/kudu/integration-tests/log_verifier.h       |  60 +++++++
 15 files changed, 351 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 228d723..0cafec4 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -292,6 +292,16 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
   return Status::OK();
 }
 
+void PeerMessageQueue::TruncateOpsAfter(const OpId& op) {
+  DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append.
+
+  {
+    std::unique_lock<simple_spinlock> lock(queue_lock_);
+    queue_state_.last_appended = op;
+  }
+  log_cache_.TruncateOpsAfter(op.index());
+}
+
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 7b39a1a..43d73d7 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -181,6 +181,10 @@ class PeerMessageQueue {
   virtual Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
                                   const StatusCallback& log_append_callback);
 
+  // Truncate all operations coming after 'op'. Following this, the 'last_appended'
+  // operation is reset to 'op', and the log cache will be truncated accordingly.
+  virtual void TruncateOpsAfter(const OpId& op);
+
   // Assembles a request for a peer, adding entries past 'op_id' up to
   // 'consensus_max_batch_size_bytes'.
   // Returns OK if the request was assembled, or Status::NotFound() if the

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc
index 2f764e0..f19783b 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -312,5 +312,45 @@ TEST_F(LogCacheTest, TestReplaceMessages) {
             cache_->ToString());
 }
 
+// Test that the cache truncates any future messages when either explicitly
+// truncated or replacing any earlier message.
+TEST_F(LogCacheTest, TestTruncation) {
+  enum {
+    TRUNCATE_BY_APPEND,
+    TRUNCATE_EXPLICITLY
+  };
+
+  // Append 1 through 3.
+  AppendReplicateMessagesToCache(1, 3, 100);
+
+  for (auto mode : {TRUNCATE_BY_APPEND, TRUNCATE_EXPLICITLY}) {
+    SCOPED_TRACE(mode == TRUNCATE_BY_APPEND ? "by append" : "explicitly");
+    // Append messages 4 through 10.
+    AppendReplicateMessagesToCache(4, 7, 100);
+    ASSERT_EQ(10, cache_->metrics_.log_cache_num_ops->value());
+
+    switch (mode) {
+      case TRUNCATE_BY_APPEND:
+        AppendReplicateMessagesToCache(3, 1, 100);
+        break;
+      case TRUNCATE_EXPLICITLY:
+        cache_->TruncateOpsAfter(3);
+        break;
+    }
+
+    ASSERT_EQ(3, cache_->metrics_.log_cache_num_ops->value());
+
+    // Op 3 should still be in the cache.
+    OpId op;
+    ASSERT_OK(cache_->LookupOpId(3, &op));
+    ASSERT_TRUE(cache_->HasOpBeenWritten(3));
+
+    // Op 4 should have been removed.
+    Status s = cache_->LookupOpId(4, &op);
+    ASSERT_TRUE(s.IsIncomplete()) << "should be truncated, but got: " << s.ToString();
+    ASSERT_FALSE(cache_->HasOpBeenWritten(4));
+  }
+}
+
 } // namespace consensus
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 914d3da..25ecd1c 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -114,6 +114,27 @@ void LogCache::Init(const OpId& preceding_op) {
   min_pinned_op_index_ = next_sequential_op_index_;
 }
 
+void LogCache::TruncateOpsAfter(int64_t index) {
+  std::unique_lock<simple_spinlock> l(lock_);
+  TruncateOpsAfterUnlocked(index);
+}
+
+void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
+  int64_t first_to_truncate = index + 1;
+  // If the index is not consecutive then it must be lower than or equal
+  // to the last index, i.e. we're overwriting.
+  CHECK_LE(first_to_truncate, next_sequential_op_index_);
+
+  // Now remove the overwritten operations.
+  for (int64_t i = first_to_truncate; i < next_sequential_op_index_; ++i) {
+    ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i);
+    if (msg != nullptr) {
+      AccountForMessageRemovalUnlocked(msg);
+    }
+  }
+  next_sequential_op_index_ = index + 1;
+}
+
 Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
                                   const StatusCallback& callback) {
   std::unique_lock<simple_spinlock> l(lock_);
@@ -127,17 +148,7 @@ Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
   int64_t last_idx_in_batch = msgs.back()->get()->id().index();
 
   if (first_idx_in_batch != next_sequential_op_index_) {
-    // If the index is not consecutive then it must be lower than or equal
-    // to the last index, i.e. we're overwriting.
-    CHECK_LE(first_idx_in_batch, next_sequential_op_index_);
-
-    // Now remove the overwritten operations.
-    for (int64_t i = first_idx_in_batch; i < next_sequential_op_index_; ++i) {
-      ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i);
-      if (msg != nullptr) {
-        AccountForMessageRemovalUnlocked(msg);
-      }
-    }
+    TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
   }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/log_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index ba6739c..77f4d3d 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -97,6 +97,15 @@ class LogCache {
   Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
                           const StatusCallback& callback);
 
+  // Truncate any operations with index > 'index'.
+  //
+  // Following this, reads of truncated indexes using ReadOps(), LookupOpId(),
+  // HasOpBeenWritten(), etc, will return as if the operations were never appended.
+  //
+  // NOTE: unless a new operation is appended followig 'index', this truncation does
+  // not persist across server restarts.
+  void TruncateOpsAfter(int64_t index);
+
   // Return true if an operation with the given index has been written through
   // the cache. The operation may not necessarily be durable yet -- it could still be
   // en route to the log.
@@ -137,6 +146,7 @@ class LogCache {
   FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages);
   FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit);
   FRIEND_TEST(LogCacheTest, TestReplaceMessages);
+  FRIEND_TEST(LogCacheTest, TestTruncation);
   friend class LogCacheTest;
 
   // Try to evict the oldest operations from the queue, stopping either when
@@ -148,6 +158,8 @@ class LogCache {
   // given message.
   void AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg);
 
+  void TruncateOpsAfterUnlocked(int64_t index);
+
   // Return a string with stats
   std::string StatsStringUnlocked() const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
index cbbf333..0aa77b1 100644
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ b/src/kudu/consensus/raft_consensus-test.cc
@@ -78,6 +78,7 @@ class MockQueue : public PeerMessageQueue {
   }
   MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs,
                                             const StatusCallback& callback));
+  MOCK_METHOD1(TruncateOpsAfter, void(const OpId& op_id));
   MOCK_METHOD1(TrackPeer, void(const string&));
   MOCK_METHOD1(UntrackPeer, void(const string&));
   MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid,
@@ -432,6 +433,10 @@ MATCHER_P2(RoundHasOpId, term, index, "") {
   return arg->id().term() == term && arg->id().index() == index;
 }
 
+MATCHER_P2(EqOpId, term, index, "") {
+  return arg.term() == term && arg.index() == index;
+}
+
 // Tests the case where a a leader is elected and pushed a sequence of
 // operations of which some never get committed. Eventually a new leader in a higher
 // term pushes operations that overwrite some of the original indexes.
@@ -488,6 +493,7 @@ TEST_F(RaftConsensusTest, TestAbortOperations) {
   }
   EXPECT_CALL(*consensus_.get(),
               NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1);
+  EXPECT_CALL(*queue_, TruncateOpsAfter(EqOpId(2, 5))).Times(1);
 
   // Nothing's committed so far, so now just send an Update() message
   // emulating another guy got elected leader and is overwriting a suffix

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 534a594..5e88be7 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -868,12 +868,20 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
   // why this is actually critical to do here, as opposed to just on requests that
   // append some ops.
   if (term_mismatch) {
-    return state_->AbortOpsAfterUnlocked(req.preceding_opid->index() - 1);
+    TruncateAndAbortOpsAfterUnlocked(req.preceding_opid->index() - 1);
   }
 
   return Status::OK();
 }
 
+void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
+  state_->AbortOpsAfterUnlocked(truncate_after_index);
+  // Above resets the 'last received' to the operation with index 'truncate_after_index'.
+  OpId new_last_received = state_->GetLastReceivedOpIdUnlocked();
+  DCHECK_EQ(truncate_after_index, new_last_received.index());
+  queue_->TruncateOpsAfter(new_last_received);
+}
+
 Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
                                                  ConsensusResponsePB* response,
                                                  LeaderRequest* deduped_req) {
@@ -942,7 +950,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
     // If the index is in our log but the terms are not the same abort down to the leader's
     // preceding id.
     if (term_mismatch) {
-      RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_opid->index()));
+      TruncateAndAbortOpsAfterUnlocked(deduped_req->preceding_opid->index());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 02d6593..d0f9fe0 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -284,6 +284,10 @@ class RaftConsensus : public Consensus,
                                     ConsensusResponsePB* response,
                                     LeaderRequest* deduped_req);
 
+  // Abort any pending operations after the given op index,
+  // and also truncate the LogCache accordingly.
+  void TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index);
+
   // Pushes a new Raft configuration to a majority of peers. Contrary to write operations,
   // this actually waits for the commit round to reach a majority of peers, so that we know
   // we can proceed. If this returns Status::OK(), a majority of peers have accepted the new

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index 394d6e5..92222bf 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -382,23 +382,23 @@ void ReplicaState::GetUncommittedPendingOperationsUnlocked(
   }
 }
 
-Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
+void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
   DCHECK(update_lock_.is_locked());
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting all transactions after (but not including): "
-      << new_preceding_idx << ". Current State: " << ToStringUnlocked();
+      << index << ". Current State: " << ToStringUnlocked();
 
-  DCHECK_GE(new_preceding_idx, 0);
+  DCHECK_GE(index, 0);
   OpId new_preceding;
 
-  auto iter = pending_txns_.lower_bound(new_preceding_idx);
+  auto iter = pending_txns_.lower_bound(index);
 
   // Either the new preceding id is in the pendings set or it must be equal to the
   // committed index since we can't truncate already committed operations.
-  if (iter != pending_txns_.end() && (*iter).first == new_preceding_idx) {
+  if (iter != pending_txns_.end() && (*iter).first == index) {
     new_preceding = (*iter).second->replicate_msg()->id();
     ++iter;
   } else {
-    CHECK_EQ(new_preceding_idx, last_committed_op_id_.index());
+    CHECK_EQ(index, last_committed_op_id_.index());
     new_preceding = last_committed_op_id_;
   }
 
@@ -429,8 +429,6 @@ Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
     // Erase the entry from pendings.
     pending_txns_.erase(iter++);
   }
-
-  return Status::OK();
 }
 
 Status ReplicaState::AddPendingOperation(const scoped_refptr<ConsensusRound>& round) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index aa30ead..efcaff2 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -232,7 +232,7 @@ class ReplicaState {
   // Aborts pending operations after, but not including 'index'. The OpId with 'index'
   // will become our new last received id. If there are pending operations with indexes
   // higher than 'index' those operations are aborted.
-  Status AbortOpsAfterUnlocked(int64_t index);
+  void AbortOpsAfterUnlocked(int64_t index);
 
   // Returns the the ConsensusRound with the provided index, if there is any, or NULL
   // if there isn't.

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index b786ecf..dad1533 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -20,6 +20,7 @@ set(INTEGRATION_TESTS_SRCS
   cluster_verifier.cc
   external_mini_cluster.cc
   external_mini_cluster_fs_inspector.cc
+  log_verifier.cc
   mini_cluster.cc
   test_workload.cc
 )

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index 4617b54..fd448d6 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -24,6 +24,7 @@
 #include "kudu/client/row_result.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/log_verifier.h"
 #include "kudu/integration-tests/external_mini_cluster.h"
 #include "kudu/tools/ksck_remote.h"
 #include "kudu/util/monotime.h"
@@ -74,6 +75,15 @@ void ClusterVerifier::CheckCluster() {
     SleepFor(MonoDelta::FromSeconds(sleep_time));
   }
   ASSERT_OK(s);
+
+  // Verify that the committed op indexes match up across the servers.
+  // We have to use "AssertEventually" here because many tests verify clusters
+  // while they are still running, and the verification can fail spuriously in
+  // the case that
+  LogVerifier lv(cluster_);
+  AssertEventually([&]() {
+      ASSERT_OK(lv.VerifyCommittedOpIdsMatch());
+    });
 }
 
 Status ClusterVerifier::DoKsck() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index f74da83..c2eed00 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/integration-tests/log_verifier.h"
 #include "kudu/integration-tests/ts_itest-base.h"
 #include "kudu/util/barrier.h"
 #include "kudu/util/logging.h"
@@ -223,6 +224,13 @@ void ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics(
   if (mismatched) {
     FAIL() << "Got mismatched responses";
   }
+
+  // Check that the servers have matching commit indexes. We shut down first because otherwise
+  // they keep appending to the logs, and the verifier can hit checksum issues trying to
+  // read from a log which is in the process of being written.
+  cluster_->Shutdown();
+  LogVerifier lv(cluster_.get());
+  ASSERT_OK(lv.VerifyCommittedOpIdsMatch());
 }
 
 // This tests exactly once semantics by starting a cluster with multiple replicas and attempting

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/log_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/log_verifier.cc b/src/kudu/integration-tests/log_verifier.cc
new file mode 100644
index 0000000..cbd1933
--- /dev/null
+++ b/src/kudu/integration-tests/log_verifier.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/log_verifier.h"
+
+#include <boost/optional.hpp>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <glog/stl_logging.h>
+
+#include "kudu/consensus/log_index.h"
+#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+
+using std::map;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+using log::LogReader;
+using itest::ExternalMiniClusterFsInspector;
+
+LogVerifier::LogVerifier(ExternalMiniCluster* cluster)
+    : cluster_(cluster) {
+}
+
+LogVerifier::~LogVerifier() {
+}
+
+Status LogVerifier::ScanForCommittedOpIds(FsManager* fs, const string& tablet_id,
+                                          map<int64_t, int64_t>* index_to_term) {
+
+  shared_ptr<LogReader> reader;
+  RETURN_NOT_OK(LogReader::Open(fs, scoped_refptr<log::LogIndex>(), tablet_id,
+                                scoped_refptr<MetricEntity>(), &reader));
+  log::SegmentSequence segs;
+  RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
+  log::LogEntryPB entry;
+  for (const auto& seg : segs) {
+    log::LogEntryReader reader(seg.get());
+    while (true) {
+      Status s = reader.ReadNextEntry(&entry);
+      if (s.IsEndOfFile() || s.IsCorruption()) break;
+      RETURN_NOT_OK(s);
+      if (entry.type() != log::COMMIT) continue;
+      const auto& op_id = entry.commit().commited_op_id();
+
+      if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) {
+        return Status::Corruption(Substitute(
+            "Index $0 had two COMMIT messages: $1.$0 and $2.$0",
+            op_id.index(), op_id.term(), (*index_to_term)[op_id.index()]));
+      }
+    }
+  }
+
+  return Status::OK();
+}
+
+Status LogVerifier::VerifyCommittedOpIdsMatch() {
+  ExternalMiniClusterFsInspector inspect(cluster_);
+  Env* env = Env::Default();
+
+  for (const string& tablet_id : inspect.ListTablets()) {
+    LOG(INFO) << "Checking tablet " << tablet_id;
+
+    // Union set of the op indexes seen on any server.
+    set<int64_t> all_op_indexes;
+    // For each server in the cluster, a map of [index->term].
+    vector<map<int64_t, int64_t>> maps_by_ts(cluster_->num_tablet_servers());
+
+    // Gather the [index->term] map for each of the tablet servers
+    // hosting this tablet.
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      const string& data_dir = cluster_->tablet_server(i)->data_dir();
+
+      FsManagerOpts fs_opts;
+      fs_opts.read_only = true;
+      fs_opts.wal_path = data_dir;
+      FsManager fs(env, fs_opts);
+      RETURN_NOT_OK_PREPEND(fs.Open(),
+                            Substitute("Couldn't initialize FS Manager for $0", data_dir));
+      const string& wal_dir = fs.GetTabletWalDir(tablet_id);
+      if (!env->FileExists(wal_dir)) continue;
+      map<int64_t, int64_t> index_to_term;
+      RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(&fs, tablet_id, &index_to_term),
+                            Substitute("Couldn't scan log for TS $0", i));
+      for (const auto& index_term : index_to_term) {
+        all_op_indexes.insert(index_term.first);
+      }
+      maps_by_ts[i] = std::move(index_to_term);
+    }
+
+    // Check that the terms match up across servers.
+    vector<int64_t> committed_terms;
+    // Indicates that the op is not on this server.
+    const int64_t kNotOnThisServer = -1;
+    for (int64_t index : all_op_indexes) {
+      committed_terms.clear();
+      for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
+        committed_terms.push_back(FindWithDefault(maps_by_ts[ts], index, kNotOnThisServer));
+      }
+      // 'committed_terms' entries should all be kNotOnThisServer or the same as each other.
+      boost::optional<int64_t> expected_term;
+      for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
+        int64_t this_ts_term = committed_terms[ts];
+        if (this_ts_term == kNotOnThisServer) continue; // this TS doesn't have the op
+        if (expected_term == boost::none) {
+          expected_term = this_ts_term;
+        } else if (this_ts_term != expected_term) {
+          string err = Substitute("Mismatch found for index $0, [", index);
+          for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+            if (i != 0) err += ", ";
+            strings::SubstituteAndAppend(&err, "T $0=$1",
+                                         cluster_->tablet_server(i)->uuid(),
+                                         committed_terms[i]);
+          }
+          err += "]";
+          return Status::Corruption(err);
+        }
+      }
+    }
+
+    LOG(INFO) << "Verified matching terms for " << all_op_indexes.size() << " ops in tablet "
+              << tablet_id;
+  }
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c4d3fb6c/src/kudu/integration-tests/log_verifier.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/log_verifier.h b/src/kudu/integration-tests/log_verifier.h
new file mode 100644
index 0000000..cb33958
--- /dev/null
+++ b/src/kudu/integration-tests/log_verifier.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class ExternalMiniCluster;
+class FsManager;
+
+// Verifies correctness of the logs in an external mini-cluster.
+class LogVerifier {
+ public:
+  explicit LogVerifier(ExternalMiniCluster* cluster);
+  ~LogVerifier();
+
+  // Verify that, for every tablet in the cluster, the logs of each of that tablet's replicas
+  // have matching committed operations. In other words, if any replica has a log entry
+  // 'COMMIT term.index', then verifies that no other replica has a COMMIT entry for the
+  // same index with a different term.
+  //
+  // This is the most basic correctness condition of Raft: all replicas should commit the
+  // same operations.
+  //
+  // NOTE: if the cluster is not shut down, it is possible for this method to fail spuriously
+  // trying to read a WAL that is currently being written. In this case, it's advisable to
+  // loop and retry on failure.
+  Status VerifyCommittedOpIdsMatch();
+
+ private:
+  // Scan the WALs for tablet 'tablet_id' on the given 'fs'. Sets entries
+  // in '*index_to_term' for each COMMIT entry found in the WALs.
+  Status ScanForCommittedOpIds(FsManager* fs, const std::string& tablet_id,
+                               std::map<int64_t, int64_t>* index_to_term);
+
+  ExternalMiniCluster* const cluster_;
+
+  DISALLOW_COPY_AND_ASSIGN(LogVerifier);
+};
+
+} // namespace kudu