You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/12/08 16:14:37 UTC

[5/6] kudu git commit: KUDU-798 (part 5) Correct safe time advancement

KUDU-798 (part 5) Correct safe time advancement

This patch fixes safe time advancement in general and allows for
safe time advancement in the absence of writes in particular.
The core of the patch is to plug in the new TimeManager class wherever
needed. But there is also a major cleanup of the waiting story in
TabletService (KUDU-1127) and a few new integration test features.

There is an instance of a TimeManager per tablet. It's used for:

- When replicating messages to other replicas a leader uses the
  TimeManager to assign timestamps and to obtain a safe time to send,
  even when there are no writes.

- When receiving messages from a leader consensus uses the TimeManager
  to update the clock and to unblock any waiters that might waiting
  for a particular timestamp to be safe.

- Before a snapshot scan proceeds to scan, it must first wait for the
  TimeManager to deem whatever timestamp it has safe. Then it proceeds
  to wait for the snapshot at timestamp to have all its transactions
  committed and, finally, proceeds with the scan.

Put together, these changes allow to make sure that snapshot scans are
repeatable in the large majority of cases. The one "hole" in safe time
is solved by leader leases. Until we have those this patch takes a
conservative approach to safe time progress.

Fixing safe time broke a bunch of our tests that were expecting broken
snapshot scans. In particular we would return broken snapshots all the
time instead of waiting for the snapshot to be correct. Of course
when these errors were fixed the tests started failing.

In order to address these test failures I cleaned up our snapshot scan
waiting story in TabletServer::HandleScanAtSnapshot(). In particular:

- The client's deadline in no longer taken into account when deciding
  how long to wait for a snapshot to be repeatable. There is a hard
  (configurable) max that the server will wait for, "clamping" the
  client's deadline. The key here is that, when the client deadline
  is clamped, we return Status::ServiceUnavailable on time out
  instead of Status::TimeOut(). Since HandleScanAtSnapshot() is called
  on KuduScanner::Open() and ServiceUnavailable is a retryable status
  this enables the client to try somewhere else, perhaps where it won't
  have to wait as long.

- TimeManager now does a pre-flight check before waiting on safe time.
  In particular it checks that: i) it has heard from the leader within
  a configurable amount of time (that safe time _can_ make progress).
  ii) it checks that the safe time is not more that a configurable
  amount of time in the past, 30 seconds by default (that safe time
  is likely to make progress to the required timestamp).

Finally, this patch adds two new integration test workloads that
prove that it works. It adds a new read workload to TestWorkload
that performs snapshot scans in the present, while writes are
happening. This can be enabled anywhere but this patch enables
it for a bunch of tests in RaftConsensusItest, in particular the
*Churny* and *Crashy* ones with unique keys. This patch also enables
linked_list-test to perform snapshot scans in the present after
the verification.

Results:

I ran raft_consensus-itest with the new snapshot read workload
on dist-test, asan, with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 1000 bin/raft_consensus-itest \
--gtest_filter=*Churny*:*Crashy*-*Duplicate*

I pulled the test to before this patch. It failed 1000/1000 on
master. With this patch it passed 1000/1000:

http://dist-test.cloudera.org//job?job_id=david.alves.1481097865.18287

I ran linked_list-test with the new snapshot scans in dist-test,
asan, with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
loop -n 1000 bin/linked_list-test --stress_cpu_threads=2 \
--gtest_filter=*TestLoadAndVerify*

The test passed 1000/1000 whereas before it would fail 427/1000.
Results:

http://dist-test.cloudera.org//job?job_id=david.alves.1481104106.24665

I also ran the test in client-test that tests fault tolerance.
Run config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 1000 -- bin/client-test \
--gtest_filter=*ScanFaultTolerance* --stress_cpu_threads=2

The test passed 1000/1000 times. Results:

http://dist-test.cloudera.org//job?job_id=david.alves.1481171410.4460

Change-Id: I8532fdb069c8bee7f3e08ffe74cab0273885cc8e
Reviewed-on: http://gerrit.cloudera.org:8080/5240
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d8fe6cf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d8fe6cf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d8fe6cf

Branch: refs/heads/master
Commit: 4d8fe6cf2a1804bae142ddfb5e672af37dad036e
Parents: cbe80ea
Author: David Alves <dr...@apache.org>
Authored: Fri Dec 2 04:01:55 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Dec 8 05:17:57 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TestHybridTime.java  |   6 +
 src/kudu/client/client-test-util.cc             |   4 +-
 src/kudu/client/client-test.cc                  |  15 +-
 src/kudu/consensus/consensus.h                  |   3 +
 src/kudu/consensus/consensus.proto              |   7 +
 src/kudu/consensus/consensus_peers-test.cc      |   3 +
 src/kudu/consensus/consensus_queue-test.cc      |   5 +
 src/kudu/consensus/consensus_queue.cc           |  26 +++-
 src/kudu/consensus/consensus_queue.h            |   4 +
 src/kudu/consensus/raft_consensus.cc            |  40 ++---
 src/kudu/consensus/raft_consensus.h             |  10 +-
 .../consensus/raft_consensus_quorum-test.cc     |   5 +-
 src/kudu/consensus/raft_consensus_state.cc      |   9 +-
 src/kudu/consensus/raft_consensus_state.h       |   5 +-
 src/kudu/consensus/time_manager.cc              |  99 +++++++++++--
 src/kudu/consensus/time_manager.h               |  22 ++-
 src/kudu/integration-tests/alter_table-test.cc  |   8 +-
 src/kudu/integration-tests/cluster_verifier.cc  |   3 +-
 .../integration-tests/linked_list-test-util.h   | 122 ++++++++++------
 src/kudu/integration-tests/linked_list-test.cc  |   6 +-
 .../integration-tests/raft_consensus-itest.cc   |  15 +-
 .../tablet_history_gc-itest.cc                  |  10 +-
 src/kudu/integration-tests/test_workload.cc     |  73 ++++++++--
 src/kudu/integration-tests/test_workload.h      |  12 ++
 src/kudu/tablet/mvcc-test.cc                    |  29 ++--
 src/kudu/tablet/mvcc.cc                         |  55 +++----
 src/kudu/tablet/mvcc.h                          |  32 ++--
 src/kudu/tablet/tablet.cc                       |   2 +-
 src/kudu/tablet/tablet_bootstrap.cc             |   3 -
 src/kudu/tablet/tablet_peer.cc                  |  15 +-
 src/kudu/tablet/tablet_peer.h                   |   5 +
 .../transactions/alter_schema_transaction.cc    |   7 +-
 .../tablet/transactions/transaction_driver.cc   |  24 +--
 .../tablet/transactions/transaction_driver.h    |   8 +-
 .../transactions/transaction_tracker-test.cc    |   3 +-
 .../tablet/transactions/transaction_tracker.cc  |   1 -
 .../tablet/transactions/write_transaction.cc    |  12 +-
 .../tablet/transactions/write_transaction.h     |   5 +-
 src/kudu/tools/tool_action_remote_replica.cc    |   7 +-
 src/kudu/tserver/tablet_service.cc              | 145 +++++++++++++------
 src/kudu/tserver/tablet_service.h               |   2 +-
 41 files changed, 592 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
index 2b57f97..2655134 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
@@ -55,6 +55,12 @@ public class TestHybridTime extends BaseKuduTest {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    // Before starting the cluster, disable automatic safe time advancement in the
+    // absence of writes. This test does snapshot reads in the present and expects
+    // certain timestamps to be assigned to the scans. If safe time was allowed
+    // to move automatically the scans might not be assigned the expected timestamps.
+    miniClusterBuilder.addTserverFlag("--safe_time_advancement_without_writes=false");
+
     BaseKuduTest.setUpBeforeClass();
 
     // Using multiple tablets doesn't work with the current way this test works since we could

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/client/client-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test-util.cc b/src/kudu/client/client-test-util.cc
index c3b6a86..814dc19 100644
--- a/src/kudu/client/client-test-util.cc
+++ b/src/kudu/client/client-test-util.cc
@@ -55,8 +55,10 @@ void LogSessionErrorsAndDie(const sp::shared_ptr<KuduSession>& session,
 void ScanTableToStrings(KuduTable* table, vector<string>* row_strings) {
   row_strings->clear();
   KuduScanner scanner(table);
+  // TODO(dralves) Change this to READ_AT_SNAPSHOT, fault tolerant scan and get rid
+  // of the retry code below.
   ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
-  ASSERT_OK(scanner.SetTimeoutMillis(60000));
+  ASSERT_OK(scanner.SetTimeoutMillis(5000));
   ScanToStrings(&scanner, row_strings);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1fc480f..4a6fcdb 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -74,10 +74,12 @@ DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(log_inject_latency);
 DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_int32(heartbeat_interval_ms);
+DECLARE_int32(leader_failure_exp_backoff_max_delta_ms);
 DECLARE_int32(log_inject_latency_ms_mean);
 DECLARE_int32(log_inject_latency_ms_stddev);
 DECLARE_int32(master_inject_latency_on_tablet_lookups_ms);
 DECLARE_int32(max_create_tablets_per_ts);
+DECLARE_int32(raft_heartbeat_interval_ms);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_inject_latency_on_each_batch_ms);
 DECLARE_int32(scanner_max_batch_size_bytes);
@@ -1096,6 +1098,8 @@ static void DoScanWithCallback(KuduTable* table,
   // Initialize fault-tolerant snapshot scanner.
   KuduScanner scanner(table);
   ASSERT_OK(scanner.SetFaultTolerant());
+  // Set a long timeout as we'll be restarting nodes while performing snapshot scans.
+  ASSERT_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */))
   // Set a small batch size so it reads in multiple batches.
   ASSERT_OK(scanner.SetBatchSizeBytes(1));
 
@@ -1148,11 +1152,12 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
   // Allow creating table with even replication factor.
   FLAGS_allow_unsafe_replication_factor = true;
 
-  // We use only two replicas in this test so that every write is fully replicated to both
-  // servers (the Raft majority is 2/2). This reduces potential flakiness if the scanner tries
-  // to read from a replica that is lagging for some reason. This won't be necessary once
-  // we implement full support for snapshot consistency (KUDU-430).
-  const int kNumReplicas = 2;
+  // Make elections faster, otherwise we can go a long time without a leader and thus without
+  // advancing safe time and unblocking scanners.
+  FLAGS_raft_heartbeat_interval_ms = 50;
+  FLAGS_leader_failure_exp_backoff_max_delta_ms = 1000;
+
+  const int kNumReplicas = 3;
   ASSERT_NO_FATAL_FAILURE(CreateTable(kScanTable, kNumReplicas, {}, {}, &table));
   ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), FLAGS_test_scan_num_rows));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index 5d5447d..5999fde 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -60,6 +60,7 @@ namespace consensus {
 class ConsensusCommitContinuation;
 class ConsensusRound;
 class ReplicaTransactionFactory;
+class TimeManager;
 
 typedef int64_t ConsensusTerm;
 
@@ -259,6 +260,8 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
   // Returns the id of the tablet whose updates this consensus instance helps coordinate.
   virtual std::string tablet_id() const = 0;
 
+  virtual scoped_refptr<TimeManager> time_manager() const = 0;
+
   // Returns a copy of the committed state of the Consensus system.
   virtual ConsensusStatePB ConsensusState(ConsensusConfigType type) const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index c10183e..699ebcc 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -347,6 +347,13 @@ message ConsensusRequestPB {
   // the process of being added to the configuration but has not yet copied a snapshot,
   // this value may drop to 0.
   optional int64 all_replicated_index = 9;
+
+  // The safe timestamp on the leader.
+  // This is only set if the leader has no messages to send to the peer or if the last sent
+  // message is already (raft) committed. By setting this the leader allows followers to advance
+  // the "safe time" past the timestamp of the last committed message and answer snapshot scans
+  // in the present in the absense of writes.
+  optional fixed64 safe_timestamp = 10;
 }
 
 message ConsensusResponsePB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index a41f8b8..626afeb 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -65,8 +65,11 @@ class ConsensusPeersTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
+    scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
+
     message_queue_.reset(new PeerMessageQueue(metric_entity_,
                                               log_.get(),
+                                              time_manager,
                                               FakeRaftPeerPB(kLeaderUuid),
                                               kTabletId));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 7f5b464..bcf98af 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -27,6 +27,7 @@
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/util/metrics.h"
@@ -73,8 +74,12 @@ class ConsensusQueueTest : public KuduTest {
   }
 
   void CloseAndReopenQueue() {
+    scoped_refptr<server::Clock> clock(new server::HybridClock());
+    ASSERT_OK(clock->Init());
+    scoped_refptr<TimeManager> time_manager(new TimeManager(clock, Timestamp::kMin));
     queue_.reset(new PeerMessageQueue(metric_entity_,
                                       log_.get(),
+                                      time_manager,
                                       FakeRaftPeerPB(kLeaderUuid),
                                       kTestTablet));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 69bc212..cdbbff0 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -32,6 +32,7 @@
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
@@ -65,6 +66,8 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 
+DECLARE_bool(safe_time_advancement_without_writes);
+
 namespace kudu {
 namespace consensus {
 
@@ -100,12 +103,14 @@ PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_ent
 
 PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
                                    const scoped_refptr<log::Log>& log,
+                                   scoped_refptr<TimeManager> time_manager,
                                    const RaftPeerPB& local_peer_pb,
                                    const string& tablet_id)
     : local_peer_pb_(local_peer_pb),
       tablet_id_(tablet_id),
       log_cache_(metric_entity, log, local_peer_pb.permanent_uuid(), tablet_id),
-      metrics_(metric_entity) {
+      metrics_(metric_entity),
+      time_manager_(std::move(time_manager)) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
   queue_state_.current_term = 0;
@@ -159,6 +164,7 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
   for (const PeersMap::value_type& entry : peers_map_) {
     entry.second->last_successful_communication_time = now;
   }
+  time_manager_->SetLeaderMode();
 }
 
 void PeerMessageQueue::SetNonLeaderMode() {
@@ -168,6 +174,7 @@ void PeerMessageQueue::SetNonLeaderMode() {
   queue_state_.majority_size_ = -1;
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
       << queue_state_.ToString();
+  time_manager_->SetNonLeaderMode();
 }
 
 void PeerMessageQueue::TrackPeer(const string& uuid) {
@@ -276,6 +283,14 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
     }
   }
 
+  // Update safe time in the TimeManager if we're leader.
+  // This will 'unpin' safe time advancement, which had stopped since we assigned a timestamp to
+  // the message.
+  // Until we have leader leases, replicas only call this when the message is committed.
+  if (queue_state_.mode == LEADER) {
+    time_manager_->AdvanceSafeTimeWithMessage(*msgs.back()->get());
+  }
+
   // Unlock ourselves during Append to prevent a deadlock: it's possible that
   // the log buffer is full, in which case AppendOperations would block. However,
   // for the log buffer to empty, it may need to call LocalPeerAppendFinished()
@@ -432,6 +447,15 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
           << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
     }
+  // If we're not sending ops to the follower, set the safe time on the request.
+  // TODO(dralves) When we have leader leases, send this all the time.
+  } else {
+    if (PREDICT_TRUE(FLAGS_safe_time_advancement_without_writes)) {
+      request->set_safe_timestamp(time_manager_->GetSafeTime().value());
+    } else {
+      KLOG_EVERY_N_SECS(WARNING, 300) << "Safe time advancement without writes is disabled. "
+            "Snapshot reads on non-leader replicas may stall if there are no writes in progress.";
+    }
   }
 
   if (PREDICT_FALSE(VLOG_IS_ON(2))) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 761e258..3d25553 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -49,6 +49,7 @@ class Log;
 
 namespace consensus {
 class PeerMessageQueueObserver;
+class TimeManager;
 
 // The id for the server-wide consensus queue MemTracker.
 extern const char kConsensusQueueParentTrackerId[];
@@ -129,6 +130,7 @@ class PeerMessageQueue {
 
   PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
                    const scoped_refptr<log::Log>& log,
+                   scoped_refptr<TimeManager> time_manager,
                    const RaftPeerPB& local_peer_pb,
                    const std::string& tablet_id);
 
@@ -429,6 +431,8 @@ class PeerMessageQueue {
   LogCache log_cache_;
 
   Metrics metrics_;
+
+  scoped_refptr<TimeManager> time_manager_;
 };
 
 // The interface between RaftConsensus and the PeerMessageQueue.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 3ee0efc..0016ea3 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -163,7 +163,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
     unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const shared_ptr<rpc::Messenger>& messenger,
     const scoped_refptr<log::Log>& log,
@@ -175,6 +175,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
   // where.
   gscoped_ptr<PeerMessageQueue> queue(new PeerMessageQueue(metric_entity,
                                                            log,
+                                                           time_manager,
                                                            local_peer_pb,
                                                            options.tablet_id));
 
@@ -206,7 +207,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
                               std::move(thread_pool),
                               metric_entity,
                               peer_uuid,
-                              clock,
+                              std::move(time_manager),
                               txn_factory,
                               log,
                               parent_mem_tracker,
@@ -222,19 +223,19 @@ RaftConsensus::RaftConsensus(
     gscoped_ptr<ThreadPool> thread_pool,
     const scoped_refptr<MetricEntity>& metric_entity,
     const std::string& peer_uuid,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const scoped_refptr<log::Log>& log,
     shared_ptr<MemTracker> parent_mem_tracker,
     Callback<void(const std::string& reason)> mark_dirty_clbk)
     : thread_pool_(std::move(thread_pool)),
       log_(log),
-      clock_(clock),
+      time_manager_(std::move(time_manager)),
       peer_proxy_factory_(std::move(peer_proxy_factory)),
       txn_factory_(txn_factory),
       peer_manager_(std::move(peer_manager)),
       queue_(std::move(queue)),
-      pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid)),
+      pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid), time_manager_),
       rng_(GetRandomSeed32()),
       failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
                        GetFailureMonitorCheckStddevMs()),
@@ -529,12 +530,7 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   auto replicate = new ReplicateMsg;
   replicate->set_op_type(NO_OP);
   replicate->mutable_noop_request(); // Define the no-op request field.
-
-  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
-  // transactions. See KUDU-798.
-  // Note: This timestamp has no meaning from a serialization perspective
-  // because this method is not executed on the TabletPeer's prepare thread.
-  replicate->set_timestamp(clock_->Now().ToUint64());
+  CHECK_OK(time_manager_->AssignTimestamp(replicate));
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate))));
@@ -1174,8 +1170,6 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     auto iter = deduped_req.messages.begin();
 
     if (PREDICT_TRUE(!deduped_req.messages.empty())) {
-      // TODO(KUDU-798) Temporary until the leader explicitly propagates the safe timestamp.
-      clock_->Update(Timestamp(deduped_req.messages.back()->get()->timestamp()));
 
       // This request contains at least one message, and is likely to increase
       // our memory pressure.
@@ -1201,6 +1195,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       if (PREDICT_FALSE(!prepare_status.ok())) {
         break;
       }
+      // TODO(dralves) Without leader leases this shouldn't be a allowed to fail.
+      // Once we have that functionality we'll have to revisit this.
+      CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
       ++iter;
     }
 
@@ -1237,6 +1234,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       }
     }
 
+    // All transactions that are going to be prepared were started, advance the safe timestamp.
+    // TODO(dralves) This is only correct because the queue only sets safe time when the request is
+    // an empty heartbeat. If we actually start setting this on a consensus request along with
+    // actual messages we need to be careful to ignore it if any of the messages fails to prepare.
+    if (request->has_safe_timestamp()) {
+      time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
+    }
+
     OpId last_from_leader;
     // 3 - Enqueue the writes.
     // Now that we've triggered the prepares enqueue the operations to be written
@@ -1307,6 +1312,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
+
     TRACE("finished");
   }
 
@@ -1772,12 +1778,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
   cc_req->set_tablet_id(tablet_id());
   *cc_req->mutable_old_config() = old_config;
   *cc_req->mutable_new_config() = new_config;
-
-  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
-  // transactions. See KUDU-798.
-  // Note: This timestamp has no meaning from a serialization perspective
-  // because this method is not executed on the TabletPeer's prepare thread.
-  cc_replicate->set_timestamp(clock_->Now().ToUint64());
+  CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate))));
@@ -1909,6 +1910,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     }
   }
 
+  // The vote was granted, become leader.
   ReplicaState::UniqueLock lock;
   Status s = state_->LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 3a67087..7777719 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -30,6 +30,7 @@
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/raft_consensus_state.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/failure_detector.h"
 
@@ -56,6 +57,7 @@ class ConsensusMetadata;
 class Peer;
 class PeerProxyFactory;
 class PeerManager;
+class TimeManager;
 struct ElectionResult;
 
 class RaftConsensus : public Consensus,
@@ -66,7 +68,7 @@ class RaftConsensus : public Consensus,
     std::unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const std::shared_ptr<rpc::Messenger>& messenger,
     const scoped_refptr<log::Log>& log,
@@ -81,7 +83,7 @@ class RaftConsensus : public Consensus,
                 gscoped_ptr<ThreadPool> thread_pool,
                 const scoped_refptr<MetricEntity>& metric_entity,
                 const std::string& peer_uuid,
-                const scoped_refptr<server::Clock>& clock,
+                scoped_refptr<TimeManager> time_manager,
                 ReplicaTransactionFactory* txn_factory,
                 const scoped_refptr<log::Log>& log,
                 std::shared_ptr<MemTracker> parent_mem_tracker,
@@ -131,6 +133,8 @@ class RaftConsensus : public Consensus,
 
   std::string tablet_id() const override;
 
+  scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
+
   ConsensusStatePB ConsensusState(ConsensusConfigType type) const override;
 
   RaftConfigPB CommittedConfig() const override;
@@ -441,7 +445,7 @@ class RaftConsensus : public Consensus,
   gscoped_ptr<ThreadPool> thread_pool_;
 
   scoped_refptr<log::Log> log_;
-  scoped_refptr<server::Clock> clock_;
+  scoped_refptr<TimeManager> time_manager_;
   gscoped_ptr<PeerProxyFactory> peer_proxy_factory_;
 
   // When we receive a message from a remote peer telling us to start a transaction, we use

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 433ef6e..79a7eb6 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -151,8 +151,11 @@ class RaftConsensusQuorumTest : public KuduTest {
 
       RaftPeerPB local_peer_pb;
       CHECK_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb));
+
+      scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
       gscoped_ptr<PeerMessageQueue> queue(new PeerMessageQueue(metric_entity_,
                                                                logs_[i],
+                                                               time_manager,
                                                                local_peer_pb,
                                                                kTestTablet));
 
@@ -177,7 +180,7 @@ class RaftConsensusQuorumTest : public KuduTest {
                             std::move(thread_pool),
                             metric_entity_,
                             config_.peers(i).permanent_uuid(),
-                            clock_,
+                            time_manager,
                             txn_factory,
                             logs_[i],
                             parent_mem_trackers_[i],

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index af5943b..2c3e973 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -21,6 +21,7 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus_state.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -351,10 +352,11 @@ string ReplicaState::ToStringUnlocked() const {
 // TODO(todd): move to its own file
 //------------------------------------------------------------
 
-PendingRounds::PendingRounds(string log_prefix)
+PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
     : log_prefix_(std::move(log_prefix)),
-      last_committed_op_id_(MinimumOpId()) {
-}
+      last_committed_op_id_(MinimumOpId()),
+      time_manager_(std::move(time_manager)) {}
+
 PendingRounds::~PendingRounds() {
 }
 
@@ -485,6 +487,7 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
 
     pending_txns_.erase(iter++);
     last_committed_op_id_ = round->id();
+    time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
     round->NotifyReplicationFinished(Status::OK());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index db8af70..ead377f 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -44,6 +44,7 @@ class Messenger;
 }
 
 namespace consensus {
+class TimeManager;
 
 // Class that coordinates access to the persistent Raft state (independently of Role).
 // This has a 1-1 relationship with RaftConsensus and is essentially responsible for
@@ -252,7 +253,7 @@ class ReplicaState {
 // We should consolidate to "round".
 class PendingRounds {
  public:
-  explicit PendingRounds(std::string log_prefix);
+  PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
   ~PendingRounds();
 
   // Set the committed op during startup. This should be done after
@@ -319,6 +320,8 @@ class PendingRounds {
 
   // The OpId of the round that was last committed. Initialized to MinimumOpId().
   OpId last_committed_op_id_;
+
+  scoped_refptr<TimeManager> time_manager_;
 };
 
 }  // namespace consensus

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/time_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.cc b/src/kudu/consensus/time_manager.cc
index 39d627c..27c6791 100644
--- a/src/kudu/consensus/time_manager.cc
+++ b/src/kudu/consensus/time_manager.cc
@@ -24,10 +24,24 @@
 #include "kudu/util/flag_tags.h"
 
 DEFINE_bool(safe_time_advancement_without_writes, true,
-            "Whether to enable the advancement of \"safe\" time in "
-            "the absense of write operations");
+            "Whether to enable the advancement of \"safe\" time in the absense of write "
+            "operations");
 TAG_FLAG(safe_time_advancement_without_writes, advanced);
 
+DEFINE_double(missed_heartbeats_before_rejecting_snapshot_scans, 1.5,
+              "The maximum raft heartbeat periods since the tablet has seen safe time advanced "
+              "before refusing scans at snapshots that aren't yet safe and forcing clients to "
+              "try again.");
+TAG_FLAG(missed_heartbeats_before_rejecting_snapshot_scans, experimental);
+
+DEFINE_int32(safe_time_max_lag_ms, 30 * 1000,
+             "The maximum amount of time we allow safe time to lag behind the requested timestamp"
+             "before forcing the client to retry, in milliseconds.");
+TAG_FLAG(safe_time_max_lag_ms, experimental);
+
+DECLARE_int32(raft_heartbeat_interval_ms);
+DECLARE_int32(scanner_max_wait_ms);
+
 namespace kudu {
 namespace consensus {
 
@@ -50,6 +64,7 @@ ExternalConsistencyMode TimeManager::GetMessageConsistencyMode(const ReplicateMs
 TimeManager::TimeManager(scoped_refptr<Clock> clock, Timestamp initial_safe_time)
   : last_serial_ts_assigned_(initial_safe_time),
     last_safe_ts_(initial_safe_time),
+    last_advanced_safe_time_(MonoTime::Now()),
     mode_(NON_LEADER),
     clock_(std::move(clock)) {}
 
@@ -67,7 +82,9 @@ void TimeManager::SetNonLeaderMode() {
 Status TimeManager::AssignTimestamp(ReplicateMsg* message) {
   Lock l(lock_);
   if (PREDICT_FALSE(mode_ == NON_LEADER)) {
-    return Status::IllegalState("Cannot assign timestamp. TimeManager is not in Leader mode.");
+    return Status::IllegalState(Substitute("Cannot assign timestamp to transaction. Tablet is not "
+                                           "in leader mode. Last heard from a leader: $0 secs ago.",
+                                           last_advanced_safe_time_.ToString()));
   }
   Timestamp t;
   switch (GetMessageConsistencyMode(*message)) {
@@ -122,7 +139,69 @@ void TimeManager::AdvanceSafeTime(Timestamp safe_time) {
   AdvanceSafeTimeAndWakeUpWaitersUnlocked(safe_time);
 }
 
+bool TimeManager::HasAdvancedSafeTimeRecentlyUnlocked(string* error_message) {
+  MonoDelta time_since_last_advance = MonoTime::Now() - last_advanced_safe_time_;
+  int64_t max_last_advanced = FLAGS_missed_heartbeats_before_rejecting_snapshot_scans *
+      FLAGS_raft_heartbeat_interval_ms;
+  // Clamp max_last_advanced to 100 ms. Some tests set leader election timeouts really
+  // low and don't necessarily want to stress scanners.
+  max_last_advanced = std::max<int64_t>(max_last_advanced, 100LL);
+  MonoDelta max_delta = MonoDelta::FromMilliseconds(max_last_advanced);
+  if (time_since_last_advance > max_delta) {
+    *error_message = Substitute("Tablet hasn't heard from leader, or there hasn't been a stable "
+                                "leader for: $0 secs, (max is $1):",
+                                time_since_last_advance.ToString(), max_delta.ToString());
+    return false;
+  }
+  return true;
+}
+
+bool TimeManager::IsSafeTimeLaggingUnlocked(Timestamp timestamp, string* error_message) {
+  // Can't calculate safe time lag for the logical clock.
+  if (PREDICT_FALSE(!clock_->HasPhysicalComponent())) return false;
+  MonoDelta safe_time_diff = clock_->GetPhysicalComponentDifference(timestamp,
+                                                                    last_safe_ts_);
+  if (safe_time_diff.ToMilliseconds() > FLAGS_safe_time_max_lag_ms) {
+    *error_message = Substitute("Tablet is lagging too much to be able to serve snapshot scan. "
+                                "Lagging by: $0 ms, (max is $1 ms):",
+                                safe_time_diff.ToMilliseconds(),
+                                FLAGS_safe_time_max_lag_ms);
+    return true;
+  }
+  return false;
+}
+
+void TimeManager::MakeWaiterTimeoutMessageUnlocked(Timestamp timestamp, string* error_message) {
+  string mode = mode_ == LEADER ? "LEADER" : "NON-LEADER";
+  string clock_diff = clock_->HasPhysicalComponent() ? clock_->GetPhysicalComponentDifference(
+      timestamp, last_safe_ts_).ToString() : "None (Logical clock)";
+  *error_message = Substitute("Timed out waiting for ts: $0 to be safe (mode: $1). Current safe "
+                              "time: $2 Physical time difference: $3", clock_->Stringify(timestamp),
+                              mode, clock_->Stringify(last_safe_ts_), clock_diff);
+}
+
 Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline) {
+  string error_message;
+
+  // Pre-flight checks:
+  // - If this timestamp is before the last safe time return.
+  // - If we're not the leader make sure we've heard from the leader recently.
+  // - If we're not the leader make sure safe time isn't lagging too much.
+  {
+    Lock l(lock_);
+    if (timestamp < GetSafeTimeUnlocked()) return Status::OK();
+
+    if (mode_ == NON_LEADER) {
+      if (IsSafeTimeLaggingUnlocked(timestamp, &error_message)) {
+        return Status::TimedOut(error_message);
+      }
+
+      if (!HasAdvancedSafeTimeRecentlyUnlocked(&error_message)) {
+        return Status::TimedOut(error_message);
+      }
+    }
+  }
+
   // First wait for the clock to be past 'timestamp'.
   RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
 
@@ -152,15 +231,9 @@ Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline)
     if (waiter.latch->count() == 0) return Status::OK();
 
     waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiter));
-    return Status::TimedOut(Substitute(
-        "Timed out waiting for ts: $0 to be safe (mode: $1). "
-        "Current safe time: $2 Physical time difference: $3",
-        clock_->Stringify(waiter.timestamp),
-        (mode_ == LEADER ? "LEADER" : "NON-LEADER"),
-        clock_->Stringify(last_safe_ts_),
-        (clock_->HasPhysicalComponent() ?
-         clock_->GetPhysicalComponentDifference(timestamp, last_safe_ts_).ToString() :
-         "None (Logical clock)")));
+
+    MakeWaiterTimeoutMessageUnlocked(waiter.timestamp, &error_message);
+    return Status::TimedOut(error_message);
   }
 }
 
@@ -169,6 +242,7 @@ void TimeManager::AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time) {
     return;
   }
   last_safe_ts_ = safe_time;
+  last_advanced_safe_time_ = MonoTime::Now();
 
   if (PREDICT_FALSE(!waiters_.empty())) {
     auto iter = waiters_.begin();
@@ -216,6 +290,7 @@ Timestamp TimeManager::GetSafeTimeUnlocked() {
       // leader will never assign a new timestamp lower than it.
       if (PREDICT_TRUE(last_serial_ts_assigned_ <= last_safe_ts_)) {
         last_safe_ts_ = clock_->Now();
+        last_advanced_safe_time_ = MonoTime::Now();
         return last_safe_ts_;
       }
       // If the current state is b), then there might be transaction with a timestamp that is lower

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/time_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h
index c8568d5..980e4ad 100644
--- a/src/kudu/consensus/time_manager.h
+++ b/src/kudu/consensus/time_manager.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <algorithm>
 #include <string>
 #include <vector>
 
@@ -113,6 +114,7 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   //
   // Returns Status::OK() if it safe time advanced past 'timestamp' before 'deadline'
   // Returns Status::TimeOut() if deadline elapsed without safe time moving enough.
+  // Returns Status::ServiceUnavailable() is the request should be retried somewhere else.
   //
   // TODO(KUDU-1127) make this return another status if safe time is too far back in the past
   // or hasn't moved in a long time.
@@ -128,6 +130,20 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   FRIEND_TEST(TimeManagerTest, TestTimeManagerNonLeaderMode);
   FRIEND_TEST(TimeManagerTest, TestTimeManagerLeaderMode);
 
+  // Returns whether we've advanced safe time recently.
+  // If this returns false we might be partitioned or there might be election churn.
+  // The client should try again.
+  // If this returns false, sets error information in 'error_message'.
+  bool HasAdvancedSafeTimeRecentlyUnlocked(std::string* error_message);
+
+  // Returns whether safe time is lagging too much behind 'timestamp' and the client
+  // should be forced to retry.
+  // If this returns true, sets error information in 'error_message'.
+  bool IsSafeTimeLaggingUnlocked(Timestamp timestamp, std::string* error_message);
+
+  // Helper to build the final error message of WaitUntilSafe().
+  void MakeWaiterTimeoutMessageUnlocked(Timestamp timestamp, std::string* error_message);
+
   // Helper to return the external consistency mode of 'message'.
   static ExternalConsistencyMode GetMessageConsistencyMode(const ReplicateMsg& message);
 
@@ -141,7 +157,7 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   struct WaitingState {
     // The timestamp the waiter requires be safe.
     Timestamp timestamp;
-    // Latch that will be count down once 'timestamp' is safe, unblocking the waiter.
+    // Latch that will be count down once 'timestamp' if safe, unblocking the waiter.
     CountDownLatch* latch;
   };
 
@@ -177,6 +193,10 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   // the last serial timestamp appended to the queue.
   Timestamp last_safe_ts_;
 
+  // The last time we advanced safe time.
+  // Used in the decision of whether we should have waiters wait or try again.
+  MonoTime last_advanced_safe_time_;
+
   // The current mode of the TimeManager.
   Mode mode_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 5e8757a..4b901dd 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -854,7 +854,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterUpdatingRemovedColumn) {
             "---------------------------\n"
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
-            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c2=12345); Undo Mutations: [@3(DELETE)]; "
+            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c2=12345); Undo Mutations: [@4(DELETE)]; "
                 "Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 
@@ -903,9 +903,9 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasIntoMissingBaseData) {
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
             "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0, int32 c2=54321); Undo Mutations: "
-                "[@5(SET c2=12345), @2(DELETE)]; Redo Mutations: [];\n"
+                "[@6(SET c2=12345), @3(DELETE)]; Redo Mutations: [];\n"
             "RowIdxInBlock: 1; Base: (int32 c0=16777216, int32 c1=1, int32 c2=12345); "
-                "Undo Mutations: [@3(DELETE)]; Redo Mutations: [];",
+                "Undo Mutations: [@4(DELETE)]; Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 }
 
@@ -958,7 +958,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterAddUpdateRemoveColumn) {
             "---------------------------\n"
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
-            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0); Undo Mutations: [@2(DELETE)]; "
+            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0); Undo Mutations: [@3(DELETE)]; "
                 "Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index d027ba5..1b38951 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -125,7 +125,8 @@ Status ClusterVerifier::DoCheckRowCount(const std::string& table_name,
   client::KuduScanner scanner(table.get());
   CHECK_OK(scanner.SetReadMode(client::KuduScanner::READ_AT_SNAPSHOT));
   CHECK_OK(scanner.SetFaultTolerant());
-  CHECK_OK(scanner.SetTimeoutMillis(5000));
+  // Allow a long scan timeout for verification.
+  CHECK_OK(scanner.SetTimeoutMillis(60 * 1000));
   CHECK_OK(scanner.SetProjectedColumns({}));
   RETURN_NOT_OK_PREPEND(scanner.Open(), "Unable to open scanner");
   int count = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/linked_list-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index c37c4ed..9e6ac01 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -51,7 +51,8 @@ static const char* const kKeyColumnName = "rand_key";
 static const char* const kLinkColumnName = "link_to";
 static const char* const kInsertTsColumnName = "insert_ts";
 static const char* const kUpdatedColumnName = "updated";
-static const int64_t kNoSnapshot = -1;
+static const int64_t kNoSnapshot = -2;
+static const int64_t kSnapshotAtNow = -1;
 static const int64_t kNoParticularCountExpected = -1;
 
 // Vector of snapshot timestamp, count pairs.
@@ -83,6 +84,15 @@ class LinkedListTester {
     CHECK_OK(b.Build(&schema_));
   }
 
+  // The modes for WaitAndVerify
+  enum WaitAndVerifyMode {
+    // Perform snapshots scans in the past but finish with a "READ_LATEST" scan.
+    // This should be used when the a majority of the cluster is down.
+    FINISH_WITH_SCAN_LATEST,
+    // Perform the snapshot scans in the past and finish with a snapshot scan in the present.
+    FINISH_WITH_SNAPSHOT_SCAN
+  };
+
   // Create the table.
   Status CreateLinkedListTable();
 
@@ -108,14 +118,26 @@ class LinkedListTester {
                                   verified_count);
   }
 
-  // Variant of VerifyLinkedListRemote that verifies without specifying a snapshot timestamp.
-  Status VerifyLinkedListNoSnapshotRemote(const int64_t expected,
-                                          const bool log_errors,
-                                          int64_t* verified_count) {
+  Status VerifyLinkedListAtLatestRemote(const int64_t expected,
+                                        const bool log_errors,
+                                        const boost::function<Status(const std::string&)>& cb,
+                                        int64_t* verified_count) {
     return VerifyLinkedListRemote(kNoSnapshot,
                                   expected,
                                   log_errors,
-                                  boost::bind(&LinkedListTester::ReturnOk, this, _1),
+                                  cb,
+                                  verified_count);
+  }
+
+  // Variant of VerifyLinkedListremote that verifies at 'now'.
+  Status VerifyLinkedListAtNowSnapshotRemote(const int64_t expected,
+                                             const bool log_errors,
+                                             const boost::function<Status(const std::string&)>& cb,
+                                             int64_t* verified_count) {
+    return VerifyLinkedListRemote(kSnapshotAtNow,
+                                  expected,
+                                  log_errors,
+                                  cb,
                                   verified_count);
   }
 
@@ -135,16 +157,19 @@ class LinkedListTester {
   // A variant of VerifyLinkedListRemote that is more robust towards ongoing
   // bootstrapping and replication.
   Status WaitAndVerify(int seconds_to_run,
-                       int64_t expected) {
+                       int64_t expected,
+                       WaitAndVerifyMode mode = FINISH_WITH_SNAPSHOT_SCAN) {
     return WaitAndVerify(seconds_to_run,
                          expected,
-                         boost::bind(&LinkedListTester::ReturnOk, this, _1));
+                         boost::bind(&LinkedListTester::ReturnOk, this, _1),
+                         mode);
   }
 
   // A variant of WaitAndVerify that also takes a callback to be run once during verification.
   Status WaitAndVerify(int seconds_to_run,
                        int64_t expected,
-                       const boost::function<Status(const std::string&)>& cb);
+                       const boost::function<Status(const std::string&)>& cb,
+                       WaitAndVerifyMode mode = FINISH_WITH_SNAPSHOT_SCAN);
 
   // Generates a vector of keys for the table such that each tablet is
   // responsible for an equal fraction of the int64 key space.
@@ -439,14 +464,6 @@ Status LinkedListTester::LoadLinkedList(
   RETURN_NOT_OK_PREPEND(client_->OpenTable(table_name_, &table),
                         "Could not open table " + table_name_);
 
-  // Instantiate a hybrid clock so that we can collect timestamps since we're running the
-  // tablet servers in an external mini cluster.
-  // TODO when they become available (KUDU-420), use client-propagated timestamps
-  // instead of reading from the clock directly. This will allow to run this test
-  // against a "real" cluster and not force the client to be synchronized.
-  scoped_refptr<server::Clock> ht_clock(new server::HybridClock());
-  RETURN_NOT_OK(ht_clock->Init());
-
   MonoTime start = MonoTime::Now();
   MonoTime deadline = start + run_for;
 
@@ -477,9 +494,9 @@ Status LinkedListTester::LoadLinkedList(
 
     MonoTime now = MonoTime::Now();
     if (next_sample < now) {
-      Timestamp now = ht_clock->Now();
+      Timestamp now(client_->GetLatestObservedTimestamp());
       sampled_timestamps_and_counts_.push_back(
-          pair<uint64_t,int64_t>(now.ToUint64(), *written_count));
+          pair<uint64_t,int64_t>(now.ToUint64() + 1, *written_count));
       next_sample += sample_interval;
       LOG(INFO) << "Sample at HT timestamp: " << now.ToString()
                 << " Inserted count: " << *written_count;
@@ -566,8 +583,8 @@ Status LinkedListTester::VerifyLinkedListRemote(
   RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
 
   string snapshot_str;
-  if (snapshot_timestamp == kNoSnapshot) {
-    snapshot_str = "LATEST";
+  if (snapshot_timestamp == kSnapshotAtNow) {
+    snapshot_str = "NOW";
   } else {
     snapshot_str = server::HybridClock::StringifyTimestamp(Timestamp(snapshot_timestamp));
   }
@@ -575,11 +592,16 @@ Status LinkedListTester::VerifyLinkedListRemote(
   client::KuduScanner scanner(table.get());
   RETURN_NOT_OK_PREPEND(scanner.SetProjectedColumns(verify_projection_), "Bad projection");
   RETURN_NOT_OK(scanner.SetBatchSizeBytes(0)); // Force at least one NextBatch RPC.
+  RETURN_NOT_OK(scanner.SetTimeoutMillis(20 * 1000 /* 20 seconds */));
 
   if (snapshot_timestamp != kNoSnapshot) {
     RETURN_NOT_OK(scanner.SetReadMode(client::KuduScanner::READ_AT_SNAPSHOT));
     RETURN_NOT_OK(scanner.SetFaultTolerant());
-    RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+    if (snapshot_timestamp != kSnapshotAtNow) {
+      RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+    }
+  } else {
+    RETURN_NOT_OK(scanner.SetReadMode(client::KuduScanner::READ_LATEST));
   }
 
   LOG(INFO) << "Verifying Snapshot: " << snapshot_str << " Expected Rows: " << expected;
@@ -597,7 +619,7 @@ Status LinkedListTester::VerifyLinkedListRemote(
   while (scanner.HasMoreRows()) {
     // If we're doing a snapshot scan with a big enough cluster, call the callback on the scanner's
     // tserver. Do this only once.
-    if (snapshot_timestamp != kNoSnapshot && !cb_called) {
+    if (snapshot_timestamp != kSnapshotAtNow && !cb_called) {
       client::KuduTabletServer* kts_ptr;
       scanner.GetCurrentServer(&kts_ptr);
       gscoped_ptr<client::KuduTabletServer> kts(kts_ptr);
@@ -618,7 +640,7 @@ Status LinkedListTester::VerifyLinkedListRemote(
       // for snapshot reads as updates are performed by their own thread. This means
       // that there is no guarantee that, for any snapshot timestamp that comes before
       // all writes are completed, all rows will be updated.
-      if (snapshot_timestamp == kNoSnapshot) {
+      if (snapshot_timestamp == kSnapshotAtNow) {
         RETURN_NOT_OK(row.GetBool(2, &updated));
       } else {
         updated = enable_mutation_;
@@ -672,7 +694,8 @@ Status LinkedListTester::VerifyLinkedListLocal(const tablet::Tablet* tablet,
 
 Status LinkedListTester::WaitAndVerify(int seconds_to_run,
                                        int64_t expected,
-                                       const boost::function<Status(const std::string&)>& cb) {
+                                       const boost::function<Status(const std::string&)>& cb,
+                                       WaitAndVerifyMode mode) {
 
   std::list<pair<int64_t, int64_t> > samples_as_list(sampled_timestamps_and_counts_.begin(),
                                                      sampled_timestamps_and_counts_.end());
@@ -726,28 +749,37 @@ Status LinkedListTester::WaitAndVerify(int seconds_to_run,
       // even if a later snapshot or the final verification failed.
       iter = samples_as_list.erase(iter);
     }
-    if (s.ok()) {
-      s = VerifyLinkedListNoSnapshotRemote(expected, last_attempt, &seen);
-    }
 
-    // TODO: when we enable hybridtime consistency for the scans,
-    // then we should not allow !s.ok() here. But, with READ_LATEST
-    // scans, we could have a lagging replica of one tablet, with an
-    // up-to-date replica of another tablet, and end up with broken links
-    // in the chain.
-
-    if (!s.ok()) {
-      LOG(INFO) << "Table not yet ready: " << seen << "/" << expected << " rows"
-                << " (status: " << s.ToString() << ")";
-      if (last_attempt) {
-        // We'll give it an equal amount of time to re-load the data as it took
-        // to write it in. Typically it completes much faster than that.
-        return Status::TimedOut("Timed out waiting for table to be accessible again",
-                                s.ToString());
-      }
+    // Perform the last scan with the required mode.
+    switch (mode) {
+      case FINISH_WITH_SNAPSHOT_SCAN:
+        if (s.ok()) {
+          RETURN_NOT_OK(VerifyLinkedListAtNowSnapshotRemote(
+              expected, last_attempt, boost::bind(&LinkedListTester::ReturnOk, this, _1), &seen));
+        }
+        break;
+      case FINISH_WITH_SCAN_LATEST:
+        // Scans in READ_LATEST mode will, by design, likely return a stale view of the tablet
+        // so, in this case, retry.
+        if (s.ok()) {
+          s = VerifyLinkedListAtLatestRemote(
+              expected, last_attempt, boost::bind(&LinkedListTester::ReturnOk, this, _1), &seen);
+        }
 
-      // Sleep and retry until timeout.
-      SleepFor(MonoDelta::FromMilliseconds(20));
+        if (!s.ok()) {
+          LOG(INFO) << "Table not yet ready: " << seen << "/" << expected << " rows"
+                    << " (status: " << s.ToString() << ")";
+          if (last_attempt) {
+            // We'll give it an equal amount of time to re-load the data as it took
+            // to write it in. Typically it completes much faster than that.
+            return Status::TimedOut("Timed out waiting for table to be accessible again",
+                                    s.ToString());
+          }
+
+          // Sleep and retry until timeout.
+          SleepFor(MonoDelta::FromMilliseconds(20));
+        }
+        break;
     }
   } while (!s.ok());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/linked_list-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/linked_list-test.cc b/src/kudu/integration-tests/linked_list-test.cc
index 04092a1..a41846d 100644
--- a/src/kudu/integration-tests/linked_list-test.cc
+++ b/src/kudu/integration-tests/linked_list-test.cc
@@ -271,6 +271,7 @@ TEST_F(LinkedListTest, TestLoadWhileOneServerDownAndVerify) {
 
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_tablets = 1;
+
   ASSERT_NO_FATAL_FAILURE(BuildAndStart());
 
   // Load the data with one of the three servers down.
@@ -298,7 +299,10 @@ TEST_F(LinkedListTest, TestLoadWhileOneServerDownAndVerify) {
 
   cluster_->tablet_server(1)->Shutdown();
   cluster_->tablet_server(2)->Shutdown();
-  ASSERT_OK(tester_->WaitAndVerify(FLAGS_seconds_to_run, written));
+
+  ASSERT_OK(tester_->WaitAndVerify(FLAGS_seconds_to_run,
+                                   written,
+                                   LinkedListTester::FINISH_WITH_SCAN_LATEST));
 }
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 995a9c4..cf83164 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -874,6 +874,7 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_
   // can verify an exact number of rows in the end, thanks to exactly once semantics.
   workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
   workload->set_num_write_threads(10);
+  workload->set_num_read_threads(2);
   workload->Setup();
   workload->Start();
 
@@ -960,6 +961,7 @@ void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
   ts_flags.push_back("--leader_failure_monitor_check_mean_ms=1");
   ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=1");
   ts_flags.push_back("--never_fsync");
+
   ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
 
   CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
@@ -1006,6 +1008,7 @@ TEST_F(RaftConsensusITest, TestChurnyElections) {
   CreateClusterForChurnyElectionsTests({});
   TestWorkload workload(cluster_.get());
   workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
   DoTestChurnyElections(&workload, kNumWrites);
 }
 
@@ -1017,6 +1020,7 @@ TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
   const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
   TestWorkload workload(cluster_.get());
   workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
   DoTestChurnyElections(&workload, kNumWrites);
 }
 
@@ -1250,6 +1254,8 @@ void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
   FLAGS_num_replicas = 3;
   FLAGS_num_tablet_servers = 3;
   vector<string> ts_flags, master_flags;
+  // Don't use the hybrid clock as we set logical timestamps on ops.
+  ts_flags.push_back("--use_hybrid_clock=false");
   ts_flags.push_back("--enable_leader_failure_detection=false");
   master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
   BuildAndStart(ts_flags, master_flags);
@@ -1520,7 +1526,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ScanRequestPB req;
     ScanResponsePB resp;
     RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromMilliseconds(5000));
+    rpc.set_timeout(MonoDelta::FromMilliseconds(100));
     NewScanRequestPB* scan = req.mutable_new_scan_request();
     scan->set_tablet_id(tablet_id_);
     scan->set_read_mode(READ_AT_SNAPSHOT);
@@ -1533,8 +1539,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ASSERT_OK(replica_ts->tserver_proxy->Scan(req, &resp, &rpc));
     SCOPED_TRACE(resp.DebugString());
     string err_str = StatusFromPB(resp.error().status()).ToString();
-    ASSERT_STR_CONTAINS(err_str, "Timed out waiting for all transactions");
-    ASSERT_STR_CONTAINS(err_str, "to commit");
+    ASSERT_STR_CONTAINS(err_str, "Timed out waiting for ts:");
+    ASSERT_STR_CONTAINS(err_str, "to be safe");
   }
 
   resp.Clear();
@@ -2378,6 +2384,7 @@ TEST_F(RaftConsensusITest, TestAutoCreateReplica) {
   workload.set_table_name(kTableId);
   workload.set_num_replicas(FLAGS_num_replicas);
   workload.set_num_write_threads(10);
+  workload.set_num_read_threads(2);
   workload.set_write_batch_size(100);
   workload.Setup();
 
@@ -2502,6 +2509,7 @@ TEST_F(RaftConsensusITest, TestSlowLeader) {
 
   TestWorkload workload(cluster_.get());
   workload.set_table_name(kTableId);
+  workload.set_num_read_threads(2);
   workload.Setup();
   workload.Start();
   SleepFor(MonoDelta::FromSeconds(60));
@@ -2652,6 +2660,7 @@ TEST_F(RaftConsensusITest, TestSlowFollower) {
 
   TestWorkload workload(cluster_.get());
   workload.set_table_name(kTableId);
+  workload.set_num_read_threads(2);
   workload.Setup();
   workload.Start();
   SleepFor(MonoDelta::FromSeconds(60));

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 0bce2c3..66753db 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -55,6 +55,8 @@ DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(tablet_history_max_age_sec);
 DECLARE_string(block_manager);
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_double(missed_heartbeats_before_rejecting_snapshot_scans);
+DECLARE_int32(safe_time_max_lag_ms);
 
 DEFINE_int32(test_num_rounds, 200, "Number of rounds to loop "
                                    "RandomizedTabletHistoryGcITest.TestRandomHistoryGCWorkload");
@@ -112,6 +114,11 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_mock_wall_clock = true;
     FLAGS_tablet_history_max_age_sec = 100;
+
+    // Set these really high since we're using the mock clock.
+    // This allows the TimeManager to still work.
+    FLAGS_safe_time_max_lag_ms = 30 * 1000 * 1000;
+    FLAGS_missed_heartbeats_before_rejecting_snapshot_scans = 100.0;
   }
 
  protected:
@@ -661,7 +668,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
 
         unique_ptr<client::KuduScanner> scanner(new KuduScanner(table.get()));
         ASSERT_OK(scanner->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
-        ASSERT_OK(scanner->SetOrderMode(KuduScanner::ORDERED));
+        ASSERT_OK(scanner->SetFaultTolerant());
+        ASSERT_OK(scanner->SetTimeoutMillis(60 * 1000));
         ASSERT_OK(scanner->SetSnapshotRaw(snapshot_ts.ToUint64()));
         ASSERT_OK(scanner->Open());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 6989223..b674b17 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -33,6 +33,8 @@
 namespace kudu {
 
 using client::KuduInsert;
+using client::KuduScanBatch;
+using client::KuduScanner;
 using client::KuduSchema;
 using client::KuduSchemaFromSchema;
 using client::KuduSession;
@@ -48,6 +50,7 @@ TestWorkload::TestWorkload(MiniClusterBase* cluster)
     rng_(SeedRandom()),
     payload_bytes_(11),
     num_write_threads_(4),
+    num_read_threads_(0),
     write_batch_size_(50),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
@@ -69,13 +72,12 @@ TestWorkload::~TestWorkload() {
   StopAndJoin();
 }
 
-void TestWorkload::WriteThread() {
-  shared_ptr<KuduTable> table;
+void TestWorkload::OpenTable(shared_ptr<KuduTable>* table) {
   // Loop trying to open up the table. In some tests we set up very
   // low RPC timeouts to test those behaviors, so this might fail and
   // need retrying.
   while (should_run_.Load()) {
-    Status s = client_->OpenTable(table_name_, &table);
+    Status s = client_->OpenTable(table_name_, table);
     if (s.ok()) {
       break;
     }
@@ -86,20 +88,26 @@ void TestWorkload::WriteThread() {
     CHECK_OK(s);
   }
 
-  shared_ptr<KuduSession> session = client_->NewSession();
-  session->SetTimeoutMillis(write_timeout_millis_);
-  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-
   // Wait for all of the workload threads to be ready to go. This maximizes the chance
   // that they all send a flood of requests at exactly the same time.
   //
   // This also minimizes the chance that we see failures to call OpenTable() if
-  // a late-starting thread overlaps with the flood of outbound traffic from the
-  // ones that are already writing data.
+  // a late-starting thread overlaps with the flood of traffic from the ones that are
+  // already writing/reading data.
   start_latch_.CountDown();
   start_latch_.Wait();
+}
+
+void TestWorkload::WriteThread() {
+  shared_ptr<KuduTable> table;
+  OpenTable(&table);
+
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(write_timeout_millis_);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
 
   while (should_run_.Load()) {
+    int inserted = 0;
     for (int i = 0; i < write_batch_size_; i++) {
       if (write_pattern_ == UPDATE_ONE_ROW) {
         gscoped_ptr<KuduUpdate> update(table->NewUpdate());
@@ -128,11 +136,10 @@ void TestWorkload::WriteThread() {
         }
         CHECK_OK(row->SetStringCopy(2, test_payload));
         CHECK_OK(session->Apply(insert.release()));
+        inserted++;
       }
     }
 
-    int inserted = write_batch_size_;
-
     Status s = session->Flush();
 
     if (PREDICT_FALSE(!s.ok())) {
@@ -158,18 +165,46 @@ void TestWorkload::WriteThread() {
           continue;
         }
 
-        CHECK(e->status().ok()) << "Unexpected status: " << e->status().ToString();
+        CHECK_OK(s);
       }
       inserted -= errors.size();
     }
 
-    rows_inserted_.IncrementBy(inserted);
     if (inserted > 0) {
+      rows_inserted_.IncrementBy(inserted);
       batches_completed_.Increment();
     }
   }
 }
 
+void TestWorkload::ReadThread() {
+  shared_ptr<KuduTable> table;
+  OpenTable(&table);
+
+  while (should_run_.Load()) {
+    // Slow the scanners down to avoid imposing too much stress on already stressful tests.
+    SleepFor(MonoDelta::FromMilliseconds(150));
+
+    KuduScanner scanner(table.get());
+    // Set a high scanner timeout so that we're likely to have a chance to scan, even in
+    // high-stress workloads.
+    CHECK_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */));
+    CHECK_OK(scanner.SetFaultTolerant());
+
+    int64_t expected_row_count = rows_inserted_.Load();
+    size_t row_count = 0;
+
+    CHECK_OK(scanner.Open());
+    while (scanner.HasMoreRows()) {
+      KuduScanBatch batch;
+      CHECK_OK(scanner.NextBatch(&batch));
+      row_count += batch.NumRows();
+    }
+
+    CHECK_GE(row_count, expected_row_count);
+  }
+}
+
 void TestWorkload::Setup() {
   CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
 
@@ -225,13 +260,14 @@ void TestWorkload::Setup() {
     CHECK_OK(row->SetInt32(1, 0));
     CHECK_OK(row->SetStringCopy(2, "hello world"));
     CHECK_OK(session->Apply(insert.release()));
+    rows_inserted_.Store(1);
   }
 }
 
 void TestWorkload::Start() {
   CHECK(!should_run_.Load()) << "Already started";
   should_run_.Store(true);
-  start_latch_.Reset(num_write_threads_);
+  start_latch_.Reset(num_write_threads_ + num_read_threads_);
   for (int i = 0; i < num_write_threads_; i++) {
     scoped_refptr<kudu::Thread> new_thread;
     CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test-writer-$0", i),
@@ -239,6 +275,15 @@ void TestWorkload::Start() {
                                   &new_thread));
     threads_.push_back(new_thread);
   }
+  // Start the read threads. Order matters here, the read threads are last so that
+  // we'll have a chance to do some scans after all writers are done.
+  for (int i = 0; i < num_read_threads_; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test-reader-$0", i),
+                                  &TestWorkload::ReadThread, this,
+                                  &new_thread));
+    threads_.push_back(new_thread);
+  }
 }
 
 void TestWorkload::StopAndJoin() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 7cd4ddf..fdab495 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -38,6 +38,11 @@ class Thread;
 // The actual data inserted is random, and thus can't be verified for
 // integrity. However, this is still useful in conjunction with ClusterVerifier
 // to verify that replicas do not diverge.
+//
+// The read workload essentially tests read-your-writes. It constantly
+// issues snapshot scans in the present and asserts that we see at least as
+// many rows as we have written, independently of which replica we choose
+// to scan.
 class TestWorkload {
  public:
   static const char* const kDefaultTableName;
@@ -53,6 +58,10 @@ class TestWorkload {
     num_write_threads_ = n;
   }
 
+  void set_num_read_threads(int n) {
+    num_read_threads_ = n;
+  }
+
   void set_write_batch_size(int s) {
     write_batch_size_ = s;
   }
@@ -170,7 +179,9 @@ class TestWorkload {
   }
 
  private:
+  void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
   void WriteThread();
+  void ReadThread();
 
   MiniClusterBase* cluster_;
   client::KuduClientBuilder client_builder_;
@@ -179,6 +190,7 @@ class TestWorkload {
 
   int payload_bytes_;
   int num_write_threads_;
+  int num_read_threads_;
   int write_batch_size_;
   int write_timeout_millis_;
   bool timeout_allowed_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index d24c108..543d806 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -43,7 +43,7 @@ class MvccTest : public KuduTest {
 
   void WaitForSnapshotAtTSThread(MvccManager* mgr, Timestamp ts) {
     MvccSnapshot s;
-    CHECK_OK(mgr->WaitForCleanSnapshotAtTimestamp(ts, &s, MonoTime::Max()));
+    CHECK_OK(mgr->WaitForSnapshotWithAllCommitted(ts, &s, MonoTime::Max()));
     CHECK(s.is_clean()) << "verifying postcondition";
     std::lock_guard<simple_spinlock> lock(lock_);
     result_snapshot_.reset(new MvccSnapshot(s));
@@ -378,6 +378,7 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
   mgr.StartTransaction(tx2);
   Timestamp tx3 = clock_->Now();
   mgr.StartTransaction(tx3);
+  mgr.AdjustSafeTime(clock_->Now());
 
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(1)));
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(2)));
@@ -409,14 +410,16 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
 
 TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithNoInflights) {
   MvccManager mgr(clock_.get());
-  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, clock_->Now());
+  Timestamp to_wait_for = clock_->Now();
+  mgr.AdjustSafeTime(clock_->Now());
+  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
 
   // join immediately.
   waiting_thread.join();
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
+TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
 
   MvccManager mgr(clock_.get());
 
@@ -425,8 +428,14 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
   Timestamp tx2 = clock_->Now();
   mgr.StartTransaction(tx2);
   mgr.AdjustSafeTime(tx2);
+  Timestamp to_wait_for = clock_->Now();
 
-  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, clock_->Now());
+  // Select a safe time that is after all transactions and after the the timestamp we'll wait for
+  // and adjust it on the MvccManager. This will cause "clean time" to move when tx1 and tx2 commit.
+  Timestamp safe_time = clock_->Now();
+  mgr.AdjustSafeTime(safe_time);
+
+  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
 
   ASSERT_FALSE(HasResultSnapshot());
   mgr.StartApplyingTransaction(tx1);
@@ -438,9 +447,8 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
+TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
   MvccManager mgr(clock_.get());
-
   Timestamp tx1 = clock_->Now();
   mgr.StartTransaction(tx1);
   Timestamp tx2 = clock_->Now();
@@ -480,7 +488,6 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
   mgr.StartTransaction(tx2);
   Timestamp tx3 = clock_->Now();
   mgr.StartTransaction(tx3);
-  mgr.AdjustSafeTime(tx3);
 
   // Start a thread waiting for transactions with ts <= 2 to commit
   thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, tx2);
@@ -498,9 +505,13 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
   SleepFor(MonoDelta::FromMilliseconds(1));
   ASSERT_FALSE(HasResultSnapshot());
 
-  // Commit tx 2 - thread can now continue
+  // Commit tx 2 - thread should still wait.
   mgr.StartApplyingTransaction(tx2);
   mgr.CommitTransaction(tx2);
+  ASSERT_FALSE(HasResultSnapshot());
+
+  // Advance safe time, thread should continue.
+  mgr.AdjustSafeTime(tx3);
   waiting_thread.join();
   ASSERT_TRUE(HasResultSnapshot());
 }
@@ -636,7 +647,7 @@ TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
   // transaction isn't committed yet.
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(10);
   MvccSnapshot snap;
-  Status s = mgr.WaitForCleanSnapshotAtTimestamp(tx1, &snap, deadline);
+  Status s = mgr.WaitForSnapshotWithAllCommitted(tx1, &snap, deadline);
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index f6316e8..bfa8a79 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -34,6 +34,8 @@
 
 namespace kudu { namespace tablet {
 
+using strings::Substitute;
+
 MvccManager::MvccManager(const scoped_refptr<server::Clock>& clock)
   : safe_time_(Timestamp::kMin),
     earliest_in_flight_(Timestamp::kMax),
@@ -72,7 +74,7 @@ void MvccManager::StartApplyingTransaction(Timestamp timestamp) {
 
 bool MvccManager::InitTransactionUnlocked(const Timestamp& timestamp) {
   // Ensure that we didn't mark the given timestamp as "safe".
-  if (PREDICT_FALSE(safe_time_ >= timestamp)) {
+  if (PREDICT_FALSE(timestamp <= safe_time_)) {
     return false;
   }
 
@@ -164,10 +166,17 @@ void MvccManager::AdjustSafeTime(Timestamp safe_time) {
 
   // No more transactions will start with a ts that is lower than or equal
   // to 'safe_time', so we adjust the snapshot accordingly.
-  if (PREDICT_TRUE(safe_time_ < safe_time)) {
+  if (PREDICT_TRUE(safe_time_ <= safe_time)) {
     safe_time_ = safe_time;
   } else {
-    // If we couldn't adjust "safe" time don't bother adjusting "clean" time.
+    // TODO(dralves) This shouldn't happen, the safe time passed to MvccManager should be
+    // monotically increasing. If if does though, the impact is on scan snapshot correctness,
+    // not on corruption of state and some test-only code sets this back (LocalTabletWriter).
+    // Note that we will still crash if a transaction comes in with a timestamp that is lower
+    // than 'cur_snap_.all_committed_before_'.
+    LOG_EVERY_N(ERROR, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
+                                         "Current Snapshot: $2", safe_time_.ToString(),
+                                         safe_time.ToString(), cur_snap_.ToString());
     return;
   }
 
@@ -226,8 +235,7 @@ void MvccManager::AdjustCleanTime() {
   }
 }
 
-Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts,
-                              const MonoTime& deadline) const {
+Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts, const MonoTime& deadline) const {
   TRACE_EVENT2("tablet", "MvccManager::WaitUntil",
                "wait_for", wait_for == ALL_COMMITTED ? "all_committed" : "none_applying",
                "ts", ts.ToUint64())
@@ -256,10 +264,9 @@ Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts,
   }
 
   waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiting_state));
-  return Status::TimedOut(strings::Substitute(
-      "Timed out waiting for all transactions with ts < $0 to $1",
-      clock_->Stringify(ts),
-      wait_for == ALL_COMMITTED ? "commit" : "finish applying"));
+  return Status::TimedOut(Substitute("Timed out waiting for all transactions with ts < $0 to $1",
+                                     ts.ToString(),
+                                     wait_for == ALL_COMMITTED ? "commit" : "finish applying"));
 }
 
 bool MvccManager::IsDoneWaitingUnlocked(const WaitingState& waiter) const {
@@ -273,16 +280,18 @@ bool MvccManager::IsDoneWaitingUnlocked(const WaitingState& waiter) const {
 }
 
 bool MvccManager::AreAllTransactionsCommittedUnlocked(Timestamp ts) const {
-  if (timestamps_in_flight_.empty()) {
-    // If nothing is in-flight, then check the clock. If the timestamp is in the past,
-    // we know that no new uncommitted transactions may start before this ts.
-    return ts <= clock_->Now();
-  }
-  // If some transactions are in flight, then check the in-flight list.
-  return !cur_snap_.MayHaveUncommittedTransactionsAtOrBefore(ts);
+  // If ts is before the 'all_committed_before_' watermark on the current snapshot then
+  // all transactions before it are committed.
+  if (ts < cur_snap_.all_committed_before_) return true;
+
+  // We might not have moved 'cur_snap_.all_committed_before_' (the clean time) but 'ts'
+  // might still come before any possible in-flights.
+  return ts < earliest_in_flight_;
 }
 
 bool MvccManager::AnyApplyingAtOrBeforeUnlocked(Timestamp ts) const {
+  // TODO(todd) this is not actually checking on the applying txns, it's checking on
+  // _all in-flight_. Is this a bug?
   for (const InFlightMap::value_type entry : timestamps_in_flight_) {
     if (entry.first <= ts.value()) {
       return true;
@@ -296,20 +305,16 @@ void MvccManager::TakeSnapshot(MvccSnapshot *snap) const {
   *snap = cur_snap_;
 }
 
-Status MvccManager::WaitForCleanSnapshotAtTimestamp(Timestamp timestamp,
-                                                    MvccSnapshot *snap,
+Status MvccManager::WaitForSnapshotWithAllCommitted(Timestamp timestamp,
+                                                    MvccSnapshot* snapshot,
                                                     const MonoTime& deadline) const {
-  TRACE_EVENT0("tablet", "MvccManager::WaitForCleanSnapshotAtTimestamp");
-  RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
+  TRACE_EVENT0("tablet", "MvccManager::WaitForSnapshotWithAllCommitted");
+
   RETURN_NOT_OK(WaitUntil(ALL_COMMITTED, timestamp, deadline));
-  *snap = MvccSnapshot(timestamp);
+  *snapshot = MvccSnapshot(timestamp);
   return Status::OK();
 }
 
-void MvccManager::WaitForCleanSnapshot(MvccSnapshot* snap) const {
-  CHECK_OK(WaitForCleanSnapshotAtTimestamp(clock_->Now(), snap, MonoTime::Max()));
-}
-
 void MvccManager::WaitForApplyingTransactionsToCommit() const {
   TRACE_EVENT0("tablet", "MvccManager::WaitForApplyingTransactionsToCommit");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 94f3789..bd106a8 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -218,6 +218,10 @@ class MvccManager {
   //
   // This must only be called when there is a guarantee that there won't be
   // any more transactions with timestamps equal to or lower than 'safe_time'.
+  //
+  // TODO(dralves) Until leader leases is implemented this should only be called
+  // with the timestamps of consensus committed transactions, not with the safe
+  // time received from the leader (which can go back without leader leases).
   void AdjustSafeTime(Timestamp safe_time);
 
   // Take a snapshot of the current MVCC state, which indicates which
@@ -228,31 +232,14 @@ class MvccManager {
   // all transactions which have a lower timestamp)
   //
   // If there are any in-flight transactions at a lower timestamp, waits for
-  // them to complete before returning. Hence, we guarantee that, upon return,
-  // snapshot->is_clean().
-  //
-  // TODO(KUDU-689): this may currently block forever, stalling scanner threads
-  // and potentially blocking tablet shutdown.
+  // them to complete before returning.
   //
-  // REQUIRES: 'timestamp' must be in the past according to the configured
-  // clock.
-  Status WaitForCleanSnapshotAtTimestamp(Timestamp timestamp,
+  // If 'timestamp' was marked safe before the call to this method (e.g. by TimeManager)
+  // then the returned snapshot is repeatable.
+  Status WaitForSnapshotWithAllCommitted(Timestamp timestamp,
                                          MvccSnapshot* snapshot,
                                          const MonoTime& deadline) const WARN_UNUSED_RESULT;
 
-  // Take a snapshot at the current timestamp, and then wait for any
-  // currently running transactions at an earlier timestamp to finish.
-  //
-  // The returned snapshot acts as a "barrier":
-  // - all transactions which started prior to this call are included in
-  //   snapshot
-  // - no transactions which start after the call returns will be included
-  //   in snapshot
-  // - snapshot->is_clean() is guaranteed
-  //
-  // Note that transactions are not blocked during this call.
-  void WaitForCleanSnapshot(MvccSnapshot* snapshot) const;
-
   // Wait for all operations that are currently APPLYING to commit.
   //
   // NOTE: this does _not_ guarantee that no transactions are APPLYING upon
@@ -286,6 +273,7 @@ class MvccManager {
   FRIEND_TEST(MvccTest, TestTxnAbort);
   FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit);
   FRIEND_TEST(MvccTest, TestWaitForApplyingTransactionsToCommit);
+  FRIEND_TEST(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights);
 
   enum TxnState {
     RESERVED,
@@ -326,7 +314,7 @@ class MvccManager {
   // Commits the given transaction.
   // Sets *was_earliest to true if this was the earliest in-flight transaction.
   void CommitTransactionUnlocked(Timestamp timestamp,
-                                 bool* was_earliest);
+                                 bool* was_earliest_in_flight);
 
   // Remove the timestamp 'ts' from the in-flight map.
   // FATALs if the ts is not in the in-flight map.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 5fe288f..aa5d39f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -391,7 +391,7 @@ void Tablet::StartTransaction(WriteTransactionState* tx_state) {
   gscoped_ptr<ScopedTransaction> mvcc_tx;
   DCHECK(tx_state->has_timestamp());
   mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp()));
-  tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
+  tx_state->SetMvccTx(std::move(mvcc_tx));
 }
 
 Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 95a167e..c8cf788 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -498,9 +498,6 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
                                            tablet_id));
   }
 
-  // Before playing any segments we set the safe and clean times to 'kMin' so that
-  // the MvccManager will accept all transactions that we replay as uncommitted.
-  tablet_->mvcc_manager()->AdjustSafeTime(Timestamp::kMin);
   RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");
 
   // Flush the consensus metadata once at the end to persist our changes, if any.