You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/12/08 16:14:33 UTC

[1/6] kudu git commit: KUDU-1793: only update lbm container bookkeeping on success

Repository: kudu
Updated Branches:
  refs/heads/master 1b3eeb3c7 -> 839bd6f9a


KUDU-1793: only update lbm container bookkeeping on success

This patch reverts part of commit abea8c6 by moving some bookkeeping out of
containers and into writable blocks, ensuring that the container's
accounting structures are only modified when a block has been written
successfully.

Additionally, since the aforementioned bug shipped in Kudu 1.1.0, the LBM
invariant of "all blocks are contiguous on disk" has been relaxed somewhat:
if we encounter an unexpected "hole", we'll place the next block after it.

The new test relies on Env fault injection to simulate out-of-disk-space
errors. It triggered a crash 100% of the time without the fix.

I also snuck in a change to relax the behavior of the LBM at startup if it
encounters an unknown record type (for future-proofing), and changed
several LOG(FATAL) callers to propagate a bad Status up the stack instead.

Change-Id: I49bc98c9f8b7dce0333f88cec85757fe122acfa4
Reviewed-on: http://gerrit.cloudera.org:8080/5399
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/faa587c6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/faa587c6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/faa587c6

Branch: refs/heads/master
Commit: faa587c639aa9e5dcf3fac04259f46ba1921140a
Parents: 1b3eeb3
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Dec 7 02:52:32 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 8 04:07:39 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc | 107 ++++++++++++++++++++++
 src/kudu/fs/log_block_manager.cc  | 161 +++++++++++++++++++++------------
 src/kudu/fs/log_block_manager.h   |  10 +-
 src/kudu/master/sys_catalog.cc    |   2 +-
 src/kudu/util/env.h               |   6 ++
 src/kudu/util/env_posix.cc        |  19 ++++
 6 files changed, 242 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 2e19afd..74ff728 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -16,6 +16,9 @@
 // under the License.
 
 #include <memory>
+#include <unordered_set>
+#include <string>
+#include <vector>
 
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
@@ -39,9 +42,12 @@ using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_bool(never_fsync);
+
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
@@ -49,8 +55,11 @@ DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
 
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
+
 DECLARE_string(block_manager);
 
+DECLARE_double(env_inject_io_error_on_write_or_preallocate);
+
 // Generic block manager metrics.
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
@@ -1126,5 +1135,103 @@ TYPED_TEST(BlockManagerTest, TestDiskSpaceCheck) {
   }
 }
 
+// Regression test for KUDU-1793.
+TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
+  const int kNumTries = 3;
+  const int kNumBlockTries = 1000;
+  const int kNumAppends = 4;
+  const string kTestData = "asdf";
+
+  // Speed up the test.
+  FLAGS_never_fsync = true;
+
+  // Since we're appending so little data, reconfigure these to ensure quite a
+  // few containers and a good amount of preallocating.
+  FLAGS_log_container_max_size = 256 * 1024;
+  FLAGS_log_container_preallocate_bytes = 8 * 1024;
+
+  // Force some file operations to fail.
+  FLAGS_env_inject_io_error_on_write_or_preallocate = 0.2;
+
+  // Creates a block, writing the result to 'out' on success.
+  auto create_a_block = [&](BlockId* out) -> Status {
+    gscoped_ptr<WritableBlock> block;
+    RETURN_NOT_OK(this->bm_->CreateBlock(&block));
+    for (int i = 0; i < kNumAppends; i++) {
+      RETURN_NOT_OK(block->Append(kTestData));
+    }
+    RETURN_NOT_OK(block->Close());
+    *out = block->id();
+    return Status::OK();
+  };
+
+  // Reads a block given by 'id', comparing its contents to kTestData.
+  auto read_a_block = [&](const BlockId& id) -> Status {
+    gscoped_ptr<ReadableBlock> block;
+    RETURN_NOT_OK(this->bm_->OpenBlock(id, &block));
+    uint64_t size;
+    RETURN_NOT_OK(block->Size(&size));
+    CHECK_EQ(kNumAppends * kTestData.size(), size);
+
+    for (int i = 0; i < kNumAppends; i++) {
+      uint8_t buf[kTestData.size()];
+      Slice s;
+      RETURN_NOT_OK(block->Read(i * kNumAppends, sizeof(buf), &s, buf));
+      CHECK_EQ(kTestData, s);
+    }
+    return Status::OK();
+  };
+
+  // For each iteration:
+  // 1. Try to create kNumTries new blocks.
+  // 2. Try to delete every other block.
+  // 3. Read and test every block.
+  // 4. Restart the block manager, forcing the on-disk metadata to be reloaded.
+  unordered_set<BlockId, BlockIdHash> ids;
+  for (int attempt = 0; attempt < kNumTries; attempt++) {
+    int num_created = 0;
+    for (int i = 0; i < kNumBlockTries; i++) {
+      BlockId id;
+      Status s = create_a_block(&id);
+      if (s.ok()) {
+        InsertOrDie(&ids, id);
+        num_created++;
+      }
+    }
+    LOG(INFO) << Substitute("Successfully created $0 blocks on $1 attempts",
+                            num_created, kNumBlockTries);
+
+    int num_deleted = 0;
+    int num_deleted_attempts = 0;
+    for (auto it = ids.begin(); it != ids.end();) {
+      // TODO(adar): the lbm removes a block from its block map even if the
+      // on-disk deletion fails. When that's fixed, update this code to
+      // erase() only if s.ok().
+      Status s = this->bm_->DeleteBlock(*it);
+      it = ids.erase(it);
+      if (s.ok()) {
+        num_deleted++;
+      }
+      num_deleted_attempts++;
+
+      // Skip every other block.
+      if (it != ids.end()) {
+        it++;
+      }
+    }
+    LOG(INFO) << Substitute("Successfully deleted $0 blocks on $1 attempts",
+                            num_deleted, num_deleted_attempts);
+
+    for (const auto& id : ids) {
+      ASSERT_OK(read_a_block(id));
+    }
+
+    ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                       shared_ptr<MemTracker>(),
+                                       { GetTestDataDirectory() },
+                                       false));
+  }
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 7877cc4..129d9cc 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -196,10 +196,19 @@ class LogBlockContainer {
   // The on-disk effects of this call are made durable only after SyncData().
   Status DeleteBlock(int64_t offset, int64_t length);
 
-  // Appends 'data' to this container's data file.
+  // Preallocate enough space to ensure that an append of 'next_append_length'
+  // can be satisfied by this container. The offset of the beginning of this
+  // block must be provided in 'block_start_offset' (since container
+  // bookkeeping is only updated when a block is finished).
+  //
+  // Does nothing if preallocation is disabled.
+  Status EnsurePreallocated(int64_t block_start_offset,
+                            size_t next_append_length);
+
+  // Writes 'data' to this container's data file at offset 'offset'.
   //
   // The on-disk effects of this call are made durable only after SyncData().
-  Status AppendData(const Slice& data);
+  Status WriteData(int64_t offset, const Slice& data);
 
   // See RWFile::Read().
   Status ReadData(int64_t offset, size_t length,
@@ -254,7 +263,7 @@ class LogBlockContainer {
   // will round up the container data file's position.
   //
   // This function is thread unsafe.
-  void UpdateBytesWritten(int64_t more_bytes);
+  void UpdateBytesWritten(int64_t block_offset, size_t block_length);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -282,14 +291,9 @@ class LogBlockContainer {
                     shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
-  void CheckBlockRecord(const BlockRecordPB& record,
-                        uint64_t data_file_size) const;
-
-  // Preallocate enough space to ensure that an append of 'next_append_bytes'
-  // can be satisfied by this container.
-  //
-  // Does nothing if preallocation is disabled.
-  Status EnsurePreallocated(size_t next_append_bytes);
+  Status CheckBlockRecord(const BlockRecordPB& record,
+                          uint64_t data_file_size,
+                          uint64_t fs_block_size) const;
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -506,6 +510,8 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
 
   uint64_t data_file_size;
   RETURN_NOT_OK(data_file_->Size(&data_file_size));
+  uint64_t fs_block_size =
+      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
   deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
@@ -526,7 +532,7 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
       // that additional data has been written to the file.
       RETURN_NOT_OK(data_file_->Size(&data_file_size));
     }
-    CheckBlockRecord(record, data_file_size);
+    RETURN_NOT_OK(CheckBlockRecord(record, data_file_size, fs_block_size));
   }
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
@@ -564,18 +570,37 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
   return read_status;
 }
 
-void LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                         uint64_t data_file_size) const {
-  if (record.op_type() == CREATE &&
-      (!record.has_offset()  ||
-       !record.has_length()  ||
+Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
+                                           uint64_t data_file_size,
+                                           uint64_t fs_block_size) const {
+  if (record.op_type() == CREATE) {
+    if (!record.has_offset() ||
+        !record.has_length() ||
         record.offset() < 0  ||
-        record.length() < 0  ||
-        record.offset() + record.length() > data_file_size)) {
-    LOG(FATAL) << "Found malformed block record in data file: " << data_file_->filename()
-               << "\nRecord: " << record.DebugString()
-               << "\nData file size: " << data_file_size;
+        record.length() < 0) {
+      return Status::Corruption(Substitute(
+          "Found malformed block record in data file: $0\nRecord: $1\n",
+          data_file_->filename(), record.DebugString()));
+    }
+    if (record.offset() + record.length() > data_file_size) {
+      return Status::Corruption(Substitute(
+          "Found block extending beyond the end of data file: $0\n"
+          "Record: $1\nData file size: $2",
+          data_file_->filename(), record.DebugString(), data_file_size));
+    }
+
+    // We could also check that the record's offset is aligned with the
+    // underlying filesystem's block size, an invariant maintained by the log
+    // block manager. However, due to KUDU-1793, that invariant may have been
+    // broken, so we'll LOG but otherwise allow it.
+    if (record.offset() % fs_block_size != 0) {
+      LOG(WARNING) << Substitute(
+          "Found misaligned block in data file: $0\nRecord: $1\n"
+          "This is likely because of KUDU-1793",
+          data_file_->filename(), record.DebugString());
+    }
   }
+  return Status::OK();
 }
 
 Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
@@ -596,13 +621,10 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
   // will have written some garbage that can be expunged during a GC.
   RETURN_NOT_OK(block_manager()->SyncContainer(*this));
 
-  // Each call to AppendData() updated 'total_bytes_written_' to reflect the
-  // new block. Nevertheless, we must call UpdateBytesWritten() whenever a
-  // block is finished in order to prepare for the next block.
   CHECK(block_manager()->AddLogBlock(this, block->id(),
-                                     total_bytes_written_ - block->BytesAppended(),
+                                     total_bytes_written_,
                                      block->BytesAppended()));
-  UpdateBytesWritten(0);
+  UpdateBytesWritten(total_bytes_written_, block->BytesAppended());
 
   // Truncate the container if it's now full; any left over preallocated space
   // is no longer needed.
@@ -638,15 +660,15 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, int64_t length) {
   return Status::OK();
 }
 
-Status LogBlockContainer::AppendData(const Slice& data) {
-  RETURN_NOT_OK(EnsurePreallocated(data.size()));
-  RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data));
-  total_bytes_written_ += data.size();
+Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) {
+  DCHECK_GE(offset, total_bytes_written_);
+
+  RETURN_NOT_OK(data_file_->Write(offset, data));
 
   // This append may have changed the container size if:
   // 1. It was large enough that it blew out the preallocated space.
   // 2. Preallocation was disabled.
-  if (total_bytes_written_ > preallocated_offset_) {
+  if (offset + data.size() > preallocated_offset_) {
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
   }
   return Status::OK();
@@ -689,16 +711,19 @@ Status LogBlockContainer::SyncMetadata() {
   return Status::OK();
 }
 
-Status LogBlockContainer::EnsurePreallocated(size_t next_append_bytes) {
+Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
+                                             size_t next_append_length) {
+  DCHECK_GE(block_start_offset, 0);
+
   if (!FLAGS_log_container_preallocate_bytes) {
     return Status::OK();
   }
 
   // If the last write blew out the preallocation window, or if the next write
   // exceeds it, we need to preallocate another chunk.
-  if (total_bytes_written_ > preallocated_offset_ ||
-      next_append_bytes > preallocated_offset_ - total_bytes_written_) {
-    int64_t off = std::max(preallocated_offset_, total_bytes_written_);
+  if (block_start_offset > preallocated_offset_ ||
+      next_append_length > preallocated_offset_ - block_start_offset) {
+    int64_t off = std::max(preallocated_offset_, block_start_offset);
     int64_t len = FLAGS_log_container_preallocate_bytes;
     RETURN_NOT_OK(data_file_->PreAllocate(off, len));
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
@@ -711,17 +736,28 @@ Status LogBlockContainer::EnsurePreallocated(size_t next_append_bytes) {
   return Status::OK();
 }
 
-void LogBlockContainer::UpdateBytesWritten(int64_t more_bytes) {
-  DCHECK_GE(more_bytes, 0);
-
-  total_bytes_written_ += more_bytes;
+void LogBlockContainer::UpdateBytesWritten(int64_t block_offset, size_t block_length) {
+  DCHECK_GE(block_offset, 0);
 
+  // The log block manager maintains block contiguity as an invariant, which
+  // means accounting for the new block should be as simple as adding its
+  // length to 'total_bytes_written_'. However, due to KUDU-1793, some
+  // containers may have developed extra "holes" between blocks. We'll account
+  // for that by considering both the block's offset and its length.
+  //
   // The number of bytes is rounded up to the nearest filesystem block so
   // that each Kudu block is guaranteed to be on a filesystem block
   // boundary. This guarantees that the disk space can be reclaimed when
   // the block is deleted.
-  total_bytes_written_ = KUDU_ALIGN_UP(total_bytes_written_,
-                                       instance()->filesystem_block_size_bytes());
+  int64_t new_total_bytes = KUDU_ALIGN_UP(
+      block_offset + block_length, instance()->filesystem_block_size_bytes());
+  if (new_total_bytes < total_bytes_written_) {
+    LOG(WARNING) << Substitute(
+        "Container $0 unexpectedly tried to lower its size (from $1 to $2 "
+        "bytes), ignoring", ToString(), total_bytes_written_, new_total_bytes);
+  }
+  total_bytes_written_ = std::max(total_bytes_written_, new_total_bytes);
+
   if (full()) {
     VLOG(1) << Substitute(
         "Container $0 with size $1 is now full, max size is $2",
@@ -941,8 +977,11 @@ Status LogWritableBlock::Append(const Slice& data) {
   // whichever comes first. We can't do it now because the block's
   // length is still in flux.
 
+  int64_t cur_block_offset = block_offset_ + block_length_;
+  RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data.size()));
+
   MicrosecondsInt64 start_time = GetMonoTimeMicros();
-  RETURN_NOT_OK(container_->AppendData(data));
+  RETURN_NOT_OK(container_->WriteData(cur_block_offset, data));
   MicrosecondsInt64 end_time = GetMonoTimeMicros();
 
   int64_t dur = end_time - start_time;
@@ -1561,7 +1600,12 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     UntrackedBlockMap blocks_in_container;
     uint64_t max_block_id = 0;
     for (const BlockRecordPB& r : records) {
-      ProcessBlockRecord(r, container.get(), &blocks_in_container);
+      s = ProcessBlockRecord(r, container.get(), &blocks_in_container);
+      if (!s.ok()) {
+        *result_status = s.CloneAndPrepend(Substitute(
+            "Could not process record in container $0", container->ToString()));
+        return;
+      }
       max_block_id = std::max(max_block_id, r.block_id().id());
     }
 
@@ -1606,19 +1650,18 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   *result_status = Status::OK();
 }
 
-void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
-                                         LogBlockContainer* container,
-                                         UntrackedBlockMap* block_map) {
+Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
+                                           LogBlockContainer* container,
+                                           UntrackedBlockMap* block_map) {
   BlockId block_id(BlockId::FromPB(record.block_id()));
   switch (record.op_type()) {
     case CREATE: {
       scoped_refptr<LogBlock> lb(new LogBlock(container, block_id,
                                               record.offset(), record.length()));
       if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        LOG(FATAL) << "Found duplicate CREATE record for block "
-                   << block_id.ToString() << " in container "
-                   << container->ToString() << ": "
-                   << record.DebugString();
+        return Status::Corruption(Substitute(
+            "found duplicate CREATE record for block $0 in container $1: $2",
+            block_id.ToString(), container->ToString(), record.DebugString()));
       }
 
       VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
@@ -1632,22 +1675,24 @@ void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
       //
       // If we ignored deleted blocks, we would end up reusing the space
       // belonging to the last deleted block in the container.
-      container->UpdateBytesWritten(record.length());
+      container->UpdateBytesWritten(record.offset(), record.length());
       break;
     }
     case DELETE:
       if (block_map->erase(block_id) != 1) {
-        LOG(FATAL) << "Found DELETE record for invalid block "
-                   << block_id.ToString() << " in container "
-                   << container->ToString() << ": "
-                   << record.DebugString();
+        return Status::Corruption(Substitute(
+            "Found DELETE record for invalid block $0 in container $1: $2",
+            block_id.ToString(), container->ToString(), record.DebugString()));
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
       break;
     default:
-      LOG(FATAL) << "Found unknown op type in block record: "
-                 << record.DebugString();
+      return Status::Corruption(Substitute(
+          "Found unknown op type in container $0: $1",
+          container->ToString(), record.DebugString()));
   }
+
+  return Status::OK();
 }
 
 std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 817d8af..aaa983f 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -248,11 +248,13 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Parse a block record, adding or removing it in 'block_map', and
+  // Parses a block record, adding or removing it in 'block_map', and
   // accounting for it in the metadata for 'container'.
-  void ProcessBlockRecord(const BlockRecordPB& record,
-                          internal::LogBlockContainer* container,
-                          UntrackedBlockMap* block_map);
+  //
+  // Returns a bad status if the record is malformed in some way.
+  Status ProcessBlockRecord(const BlockRecordPB& record,
+                            internal::LogBlockContainer* container,
+                            UntrackedBlockMap* block_map);
 
   // Open a particular data directory belonging to the block manager.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 27e1ce8..e6b01f9 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -55,7 +55,7 @@
 
 DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
-TAG_FLAG(sys_catalog_fail_during_write, unsafe);
+TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
 using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
 using kudu::consensus::ConsensusMetadata;

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 2748413..a9b8751 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -296,6 +296,12 @@ class Env {
   // kernel) to be equal to the hard limit.
   virtual void IncreaseOpenFileLimit() = 0;
 
+  // Special string injected into file-growing operations' random failures
+  // (if enabled).
+  //
+  // Only useful for tests.
+  static const char* const kInjectedFailureStatusMsg;
+
  private:
   // No copying allowed
   Env(const Env&);

http://git-wip-us.apache.org/repos/asf/kudu/blob/faa587c6/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 458cdd0..715207f 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -39,6 +39,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/monotime.h"
@@ -106,6 +107,10 @@ DEFINE_bool(never_fsync, false,
 TAG_FLAG(never_fsync, advanced);
 TAG_FLAG(never_fsync, unsafe);
 
+DEFINE_double(env_inject_io_error_on_write_or_preallocate, 0.0,
+              "Fraction of the time that write or preallocate operations will fail");
+TAG_FLAG(env_inject_io_error_on_write_or_preallocate, hidden);
+
 using base::subtle::Atomic64;
 using base::subtle::Barrier_AtomicIncrement;
 using std::string;
@@ -118,6 +123,8 @@ static Atomic64 cur_thread_local_id_;
 
 namespace kudu {
 
+const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE";
+
 namespace {
 
 #if defined(__APPLE__)
@@ -360,6 +367,9 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status PreAllocate(uint64_t size) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     uint64_t offset = std::max(filesize_, pre_allocated_size_);
@@ -453,6 +463,9 @@ class PosixWritableFile : public WritableFile {
 
   Status DoWritev(const vector<Slice>& data_vector,
                   size_t offset, size_t n) {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     ThreadRestrictions::AssertIOAllowed();
 #if defined(__linux__)
     DCHECK_LE(n, IOV_MAX);
@@ -560,6 +573,9 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     ThreadRestrictions::AssertIOAllowed();
     ssize_t written;
     RETRY_ON_EINTR(written, pwrite(fd_, data.data(), data.size(), offset));
@@ -581,6 +597,9 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status PreAllocate(uint64_t offset, size_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
+
     TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     if (fallocate(fd_, 0, offset, length) < 0) {


[2/6] kudu git commit: [docs] - Fix python development section location

Posted by jd...@apache.org.
[docs] - Fix python development section location

The Python Client section on the development is wedged between
two of the Spark sub-sections, this patch fixes that issue.

Change-Id: Ia437a3c139f81540e9f6883347afd90cdc565a0a
Reviewed-on: http://gerrit.cloudera.org:8080/5414
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/db4d0a65
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/db4d0a65
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/db4d0a65

Branch: refs/heads/master
Commit: db4d0a653caea6f9be47072555c70ba601d6004b
Parents: faa587c
Author: Jordan Birdsell <jt...@apache.org>
Authored: Wed Dec 7 22:58:05 2016 -0500
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 8 04:09:55 2016 +0000

----------------------------------------------------------------------
 docs/developing.adoc | 31 ++++++++++++++++---------------
 1 file changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/db4d0a65/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index 03c8cdc..4bfea40 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -155,7 +155,22 @@ kuduContext.tableExists("another_table")
 kuduContext.deleteTable("unwanted_table")
 ----
 
-=== Kudu Python Client
+=== Spark Integration Known Issues and Limitations
+
+- The Kudu Spark integration is tested and developed against Spark 1.6 and Scala
+  2.10.
+- Kudu tables with a name containing upper case or non-ascii characters must be
+  assigned an alternate name when registered as a temporary table.
+- Kudu tables with a column name containing upper case or non-ascii characters
+  may not be used with SparkSQL. Non-primary key columns may be renamed in Kudu
+  to work around this issue.
+- `NULL`, `NOT NULL`, `<>`, `OR`, `LIKE`, and `IN` predicates are not pushed to
+  Kudu, and instead will be evaluated by the Spark task.
+- Kudu does not support all types supported by Spark SQL, such as `Date`,
+  `Decimal` and complex types.
+
+
+== Kudu Python Client
 The Kudu Python client provides a Python friendly interface to the C++ client API.
 The sample below demonstrates the use of part of the Python client.
 [source,python]
@@ -216,20 +231,6 @@ scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))
 result = scanner.open().read_all_tuples()
 ----
 
-=== Spark Integration Known Issues and Limitations
-
-- The Kudu Spark integration is tested and developed against Spark 1.6 and Scala
-  2.10.
-- Kudu tables with a name containing upper case or non-ascii characters must be
-  assigned an alternate name when registered as a temporary table.
-- Kudu tables with a column name containing upper case or non-ascii characters
-  may not be used with SparkSQL. Non-primary key columns may be renamed in Kudu
-  to work around this issue.
-- `NULL`, `NOT NULL`, `<>`, `OR`, `LIKE`, and `IN` predicates are not pushed to
-  Kudu, and instead will be evaluated by the Spark task.
-- Kudu does not support all types supported by Spark SQL, such as `Date`,
-  `Decimal` and complex types.
-
 == Integration with MapReduce, YARN, and Other Frameworks
 
 Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in


[5/6] kudu git commit: KUDU-798 (part 5) Correct safe time advancement

Posted by jd...@apache.org.
KUDU-798 (part 5) Correct safe time advancement

This patch fixes safe time advancement in general and allows for
safe time advancement in the absence of writes in particular.
The core of the patch is to plug in the new TimeManager class wherever
needed. But there is also a major cleanup of the waiting story in
TabletService (KUDU-1127) and a few new integration test features.

There is an instance of a TimeManager per tablet. It's used for:

- When replicating messages to other replicas a leader uses the
  TimeManager to assign timestamps and to obtain a safe time to send,
  even when there are no writes.

- When receiving messages from a leader consensus uses the TimeManager
  to update the clock and to unblock any waiters that might waiting
  for a particular timestamp to be safe.

- Before a snapshot scan proceeds to scan, it must first wait for the
  TimeManager to deem whatever timestamp it has safe. Then it proceeds
  to wait for the snapshot at timestamp to have all its transactions
  committed and, finally, proceeds with the scan.

Put together, these changes allow to make sure that snapshot scans are
repeatable in the large majority of cases. The one "hole" in safe time
is solved by leader leases. Until we have those this patch takes a
conservative approach to safe time progress.

Fixing safe time broke a bunch of our tests that were expecting broken
snapshot scans. In particular we would return broken snapshots all the
time instead of waiting for the snapshot to be correct. Of course
when these errors were fixed the tests started failing.

In order to address these test failures I cleaned up our snapshot scan
waiting story in TabletServer::HandleScanAtSnapshot(). In particular:

- The client's deadline in no longer taken into account when deciding
  how long to wait for a snapshot to be repeatable. There is a hard
  (configurable) max that the server will wait for, "clamping" the
  client's deadline. The key here is that, when the client deadline
  is clamped, we return Status::ServiceUnavailable on time out
  instead of Status::TimeOut(). Since HandleScanAtSnapshot() is called
  on KuduScanner::Open() and ServiceUnavailable is a retryable status
  this enables the client to try somewhere else, perhaps where it won't
  have to wait as long.

- TimeManager now does a pre-flight check before waiting on safe time.
  In particular it checks that: i) it has heard from the leader within
  a configurable amount of time (that safe time _can_ make progress).
  ii) it checks that the safe time is not more that a configurable
  amount of time in the past, 30 seconds by default (that safe time
  is likely to make progress to the required timestamp).

Finally, this patch adds two new integration test workloads that
prove that it works. It adds a new read workload to TestWorkload
that performs snapshot scans in the present, while writes are
happening. This can be enabled anywhere but this patch enables
it for a bunch of tests in RaftConsensusItest, in particular the
*Churny* and *Crashy* ones with unique keys. This patch also enables
linked_list-test to perform snapshot scans in the present after
the verification.

Results:

I ran raft_consensus-itest with the new snapshot read workload
on dist-test, asan, with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 1000 bin/raft_consensus-itest \
--gtest_filter=*Churny*:*Crashy*-*Duplicate*

I pulled the test to before this patch. It failed 1000/1000 on
master. With this patch it passed 1000/1000:

http://dist-test.cloudera.org//job?job_id=david.alves.1481097865.18287

I ran linked_list-test with the new snapshot scans in dist-test,
asan, with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
loop -n 1000 bin/linked_list-test --stress_cpu_threads=2 \
--gtest_filter=*TestLoadAndVerify*

The test passed 1000/1000 whereas before it would fail 427/1000.
Results:

http://dist-test.cloudera.org//job?job_id=david.alves.1481104106.24665

I also ran the test in client-test that tests fault tolerance.
Run config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 1000 -- bin/client-test \
--gtest_filter=*ScanFaultTolerance* --stress_cpu_threads=2

The test passed 1000/1000 times. Results:

http://dist-test.cloudera.org//job?job_id=david.alves.1481171410.4460

Change-Id: I8532fdb069c8bee7f3e08ffe74cab0273885cc8e
Reviewed-on: http://gerrit.cloudera.org:8080/5240
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d8fe6cf
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d8fe6cf
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d8fe6cf

Branch: refs/heads/master
Commit: 4d8fe6cf2a1804bae142ddfb5e672af37dad036e
Parents: cbe80ea
Author: David Alves <dr...@apache.org>
Authored: Fri Dec 2 04:01:55 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Dec 8 05:17:57 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TestHybridTime.java  |   6 +
 src/kudu/client/client-test-util.cc             |   4 +-
 src/kudu/client/client-test.cc                  |  15 +-
 src/kudu/consensus/consensus.h                  |   3 +
 src/kudu/consensus/consensus.proto              |   7 +
 src/kudu/consensus/consensus_peers-test.cc      |   3 +
 src/kudu/consensus/consensus_queue-test.cc      |   5 +
 src/kudu/consensus/consensus_queue.cc           |  26 +++-
 src/kudu/consensus/consensus_queue.h            |   4 +
 src/kudu/consensus/raft_consensus.cc            |  40 ++---
 src/kudu/consensus/raft_consensus.h             |  10 +-
 .../consensus/raft_consensus_quorum-test.cc     |   5 +-
 src/kudu/consensus/raft_consensus_state.cc      |   9 +-
 src/kudu/consensus/raft_consensus_state.h       |   5 +-
 src/kudu/consensus/time_manager.cc              |  99 +++++++++++--
 src/kudu/consensus/time_manager.h               |  22 ++-
 src/kudu/integration-tests/alter_table-test.cc  |   8 +-
 src/kudu/integration-tests/cluster_verifier.cc  |   3 +-
 .../integration-tests/linked_list-test-util.h   | 122 ++++++++++------
 src/kudu/integration-tests/linked_list-test.cc  |   6 +-
 .../integration-tests/raft_consensus-itest.cc   |  15 +-
 .../tablet_history_gc-itest.cc                  |  10 +-
 src/kudu/integration-tests/test_workload.cc     |  73 ++++++++--
 src/kudu/integration-tests/test_workload.h      |  12 ++
 src/kudu/tablet/mvcc-test.cc                    |  29 ++--
 src/kudu/tablet/mvcc.cc                         |  55 +++----
 src/kudu/tablet/mvcc.h                          |  32 ++--
 src/kudu/tablet/tablet.cc                       |   2 +-
 src/kudu/tablet/tablet_bootstrap.cc             |   3 -
 src/kudu/tablet/tablet_peer.cc                  |  15 +-
 src/kudu/tablet/tablet_peer.h                   |   5 +
 .../transactions/alter_schema_transaction.cc    |   7 +-
 .../tablet/transactions/transaction_driver.cc   |  24 +--
 .../tablet/transactions/transaction_driver.h    |   8 +-
 .../transactions/transaction_tracker-test.cc    |   3 +-
 .../tablet/transactions/transaction_tracker.cc  |   1 -
 .../tablet/transactions/write_transaction.cc    |  12 +-
 .../tablet/transactions/write_transaction.h     |   5 +-
 src/kudu/tools/tool_action_remote_replica.cc    |   7 +-
 src/kudu/tserver/tablet_service.cc              | 145 +++++++++++++------
 src/kudu/tserver/tablet_service.h               |   2 +-
 41 files changed, 592 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
index 2b57f97..2655134 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
@@ -55,6 +55,12 @@ public class TestHybridTime extends BaseKuduTest {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    // Before starting the cluster, disable automatic safe time advancement in the
+    // absence of writes. This test does snapshot reads in the present and expects
+    // certain timestamps to be assigned to the scans. If safe time was allowed
+    // to move automatically the scans might not be assigned the expected timestamps.
+    miniClusterBuilder.addTserverFlag("--safe_time_advancement_without_writes=false");
+
     BaseKuduTest.setUpBeforeClass();
 
     // Using multiple tablets doesn't work with the current way this test works since we could

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/client/client-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test-util.cc b/src/kudu/client/client-test-util.cc
index c3b6a86..814dc19 100644
--- a/src/kudu/client/client-test-util.cc
+++ b/src/kudu/client/client-test-util.cc
@@ -55,8 +55,10 @@ void LogSessionErrorsAndDie(const sp::shared_ptr<KuduSession>& session,
 void ScanTableToStrings(KuduTable* table, vector<string>* row_strings) {
   row_strings->clear();
   KuduScanner scanner(table);
+  // TODO(dralves) Change this to READ_AT_SNAPSHOT, fault tolerant scan and get rid
+  // of the retry code below.
   ASSERT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
-  ASSERT_OK(scanner.SetTimeoutMillis(60000));
+  ASSERT_OK(scanner.SetTimeoutMillis(5000));
   ScanToStrings(&scanner, row_strings);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1fc480f..4a6fcdb 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -74,10 +74,12 @@ DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(log_inject_latency);
 DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_int32(heartbeat_interval_ms);
+DECLARE_int32(leader_failure_exp_backoff_max_delta_ms);
 DECLARE_int32(log_inject_latency_ms_mean);
 DECLARE_int32(log_inject_latency_ms_stddev);
 DECLARE_int32(master_inject_latency_on_tablet_lookups_ms);
 DECLARE_int32(max_create_tablets_per_ts);
+DECLARE_int32(raft_heartbeat_interval_ms);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_inject_latency_on_each_batch_ms);
 DECLARE_int32(scanner_max_batch_size_bytes);
@@ -1096,6 +1098,8 @@ static void DoScanWithCallback(KuduTable* table,
   // Initialize fault-tolerant snapshot scanner.
   KuduScanner scanner(table);
   ASSERT_OK(scanner.SetFaultTolerant());
+  // Set a long timeout as we'll be restarting nodes while performing snapshot scans.
+  ASSERT_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */))
   // Set a small batch size so it reads in multiple batches.
   ASSERT_OK(scanner.SetBatchSizeBytes(1));
 
@@ -1148,11 +1152,12 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
   // Allow creating table with even replication factor.
   FLAGS_allow_unsafe_replication_factor = true;
 
-  // We use only two replicas in this test so that every write is fully replicated to both
-  // servers (the Raft majority is 2/2). This reduces potential flakiness if the scanner tries
-  // to read from a replica that is lagging for some reason. This won't be necessary once
-  // we implement full support for snapshot consistency (KUDU-430).
-  const int kNumReplicas = 2;
+  // Make elections faster, otherwise we can go a long time without a leader and thus without
+  // advancing safe time and unblocking scanners.
+  FLAGS_raft_heartbeat_interval_ms = 50;
+  FLAGS_leader_failure_exp_backoff_max_delta_ms = 1000;
+
+  const int kNumReplicas = 3;
   ASSERT_NO_FATAL_FAILURE(CreateTable(kScanTable, kNumReplicas, {}, {}, &table));
   ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), FLAGS_test_scan_num_rows));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index 5d5447d..5999fde 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -60,6 +60,7 @@ namespace consensus {
 class ConsensusCommitContinuation;
 class ConsensusRound;
 class ReplicaTransactionFactory;
+class TimeManager;
 
 typedef int64_t ConsensusTerm;
 
@@ -259,6 +260,8 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
   // Returns the id of the tablet whose updates this consensus instance helps coordinate.
   virtual std::string tablet_id() const = 0;
 
+  virtual scoped_refptr<TimeManager> time_manager() const = 0;
+
   // Returns a copy of the committed state of the Consensus system.
   virtual ConsensusStatePB ConsensusState(ConsensusConfigType type) const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index c10183e..699ebcc 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -347,6 +347,13 @@ message ConsensusRequestPB {
   // the process of being added to the configuration but has not yet copied a snapshot,
   // this value may drop to 0.
   optional int64 all_replicated_index = 9;
+
+  // The safe timestamp on the leader.
+  // This is only set if the leader has no messages to send to the peer or if the last sent
+  // message is already (raft) committed. By setting this the leader allows followers to advance
+  // the "safe time" past the timestamp of the last committed message and answer snapshot scans
+  // in the present in the absense of writes.
+  optional fixed64 safe_timestamp = 10;
 }
 
 message ConsensusResponsePB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index a41f8b8..626afeb 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -65,8 +65,11 @@ class ConsensusPeersTest : public KuduTest {
     clock_.reset(new server::HybridClock());
     ASSERT_OK(clock_->Init());
 
+    scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
+
     message_queue_.reset(new PeerMessageQueue(metric_entity_,
                                               log_.get(),
+                                              time_manager,
                                               FakeRaftPeerPB(kLeaderUuid),
                                               kTabletId));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 7f5b464..bcf98af 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -27,6 +27,7 @@
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/util/metrics.h"
@@ -73,8 +74,12 @@ class ConsensusQueueTest : public KuduTest {
   }
 
   void CloseAndReopenQueue() {
+    scoped_refptr<server::Clock> clock(new server::HybridClock());
+    ASSERT_OK(clock->Init());
+    scoped_refptr<TimeManager> time_manager(new TimeManager(clock, Timestamp::kMin));
     queue_.reset(new PeerMessageQueue(metric_entity_,
                                       log_.get(),
+                                      time_manager,
                                       FakeRaftPeerPB(kLeaderUuid),
                                       kTestTablet));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 69bc212..cdbbff0 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -32,6 +32,7 @@
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
@@ -65,6 +66,8 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 
+DECLARE_bool(safe_time_advancement_without_writes);
+
 namespace kudu {
 namespace consensus {
 
@@ -100,12 +103,14 @@ PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_ent
 
 PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
                                    const scoped_refptr<log::Log>& log,
+                                   scoped_refptr<TimeManager> time_manager,
                                    const RaftPeerPB& local_peer_pb,
                                    const string& tablet_id)
     : local_peer_pb_(local_peer_pb),
       tablet_id_(tablet_id),
       log_cache_(metric_entity, log, local_peer_pb.permanent_uuid(), tablet_id),
-      metrics_(metric_entity) {
+      metrics_(metric_entity),
+      time_manager_(std::move(time_manager)) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
   queue_state_.current_term = 0;
@@ -159,6 +164,7 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
   for (const PeersMap::value_type& entry : peers_map_) {
     entry.second->last_successful_communication_time = now;
   }
+  time_manager_->SetLeaderMode();
 }
 
 void PeerMessageQueue::SetNonLeaderMode() {
@@ -168,6 +174,7 @@ void PeerMessageQueue::SetNonLeaderMode() {
   queue_state_.majority_size_ = -1;
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
       << queue_state_.ToString();
+  time_manager_->SetNonLeaderMode();
 }
 
 void PeerMessageQueue::TrackPeer(const string& uuid) {
@@ -276,6 +283,14 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
     }
   }
 
+  // Update safe time in the TimeManager if we're leader.
+  // This will 'unpin' safe time advancement, which had stopped since we assigned a timestamp to
+  // the message.
+  // Until we have leader leases, replicas only call this when the message is committed.
+  if (queue_state_.mode == LEADER) {
+    time_manager_->AdvanceSafeTimeWithMessage(*msgs.back()->get());
+  }
+
   // Unlock ourselves during Append to prevent a deadlock: it's possible that
   // the log buffer is full, in which case AppendOperations would block. However,
   // for the log buffer to empty, it may need to call LocalPeerAppendFinished()
@@ -432,6 +447,15 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
           << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
     }
+  // If we're not sending ops to the follower, set the safe time on the request.
+  // TODO(dralves) When we have leader leases, send this all the time.
+  } else {
+    if (PREDICT_TRUE(FLAGS_safe_time_advancement_without_writes)) {
+      request->set_safe_timestamp(time_manager_->GetSafeTime().value());
+    } else {
+      KLOG_EVERY_N_SECS(WARNING, 300) << "Safe time advancement without writes is disabled. "
+            "Snapshot reads on non-leader replicas may stall if there are no writes in progress.";
+    }
   }
 
   if (PREDICT_FALSE(VLOG_IS_ON(2))) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 761e258..3d25553 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -49,6 +49,7 @@ class Log;
 
 namespace consensus {
 class PeerMessageQueueObserver;
+class TimeManager;
 
 // The id for the server-wide consensus queue MemTracker.
 extern const char kConsensusQueueParentTrackerId[];
@@ -129,6 +130,7 @@ class PeerMessageQueue {
 
   PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
                    const scoped_refptr<log::Log>& log,
+                   scoped_refptr<TimeManager> time_manager,
                    const RaftPeerPB& local_peer_pb,
                    const std::string& tablet_id);
 
@@ -429,6 +431,8 @@ class PeerMessageQueue {
   LogCache log_cache_;
 
   Metrics metrics_;
+
+  scoped_refptr<TimeManager> time_manager_;
 };
 
 // The interface between RaftConsensus and the PeerMessageQueue.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 3ee0efc..0016ea3 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -163,7 +163,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
     unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const shared_ptr<rpc::Messenger>& messenger,
     const scoped_refptr<log::Log>& log,
@@ -175,6 +175,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
   // where.
   gscoped_ptr<PeerMessageQueue> queue(new PeerMessageQueue(metric_entity,
                                                            log,
+                                                           time_manager,
                                                            local_peer_pb,
                                                            options.tablet_id));
 
@@ -206,7 +207,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
                               std::move(thread_pool),
                               metric_entity,
                               peer_uuid,
-                              clock,
+                              std::move(time_manager),
                               txn_factory,
                               log,
                               parent_mem_tracker,
@@ -222,19 +223,19 @@ RaftConsensus::RaftConsensus(
     gscoped_ptr<ThreadPool> thread_pool,
     const scoped_refptr<MetricEntity>& metric_entity,
     const std::string& peer_uuid,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const scoped_refptr<log::Log>& log,
     shared_ptr<MemTracker> parent_mem_tracker,
     Callback<void(const std::string& reason)> mark_dirty_clbk)
     : thread_pool_(std::move(thread_pool)),
       log_(log),
-      clock_(clock),
+      time_manager_(std::move(time_manager)),
       peer_proxy_factory_(std::move(peer_proxy_factory)),
       txn_factory_(txn_factory),
       peer_manager_(std::move(peer_manager)),
       queue_(std::move(queue)),
-      pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid)),
+      pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid), time_manager_),
       rng_(GetRandomSeed32()),
       failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
                        GetFailureMonitorCheckStddevMs()),
@@ -529,12 +530,7 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   auto replicate = new ReplicateMsg;
   replicate->set_op_type(NO_OP);
   replicate->mutable_noop_request(); // Define the no-op request field.
-
-  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
-  // transactions. See KUDU-798.
-  // Note: This timestamp has no meaning from a serialization perspective
-  // because this method is not executed on the TabletPeer's prepare thread.
-  replicate->set_timestamp(clock_->Now().ToUint64());
+  CHECK_OK(time_manager_->AssignTimestamp(replicate));
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate))));
@@ -1174,8 +1170,6 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     auto iter = deduped_req.messages.begin();
 
     if (PREDICT_TRUE(!deduped_req.messages.empty())) {
-      // TODO(KUDU-798) Temporary until the leader explicitly propagates the safe timestamp.
-      clock_->Update(Timestamp(deduped_req.messages.back()->get()->timestamp()));
 
       // This request contains at least one message, and is likely to increase
       // our memory pressure.
@@ -1201,6 +1195,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       if (PREDICT_FALSE(!prepare_status.ok())) {
         break;
       }
+      // TODO(dralves) Without leader leases this shouldn't be a allowed to fail.
+      // Once we have that functionality we'll have to revisit this.
+      CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get()));
       ++iter;
     }
 
@@ -1237,6 +1234,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       }
     }
 
+    // All transactions that are going to be prepared were started, advance the safe timestamp.
+    // TODO(dralves) This is only correct because the queue only sets safe time when the request is
+    // an empty heartbeat. If we actually start setting this on a consensus request along with
+    // actual messages we need to be careful to ignore it if any of the messages fails to prepare.
+    if (request->has_safe_timestamp()) {
+      time_manager_->AdvanceSafeTime(Timestamp(request->safe_timestamp()));
+    }
+
     OpId last_from_leader;
     // 3 - Enqueue the writes.
     // Now that we've triggered the prepares enqueue the operations to be written
@@ -1307,6 +1312,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
+
     TRACE("finished");
   }
 
@@ -1772,12 +1778,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
   cc_req->set_tablet_id(tablet_id());
   *cc_req->mutable_old_config() = old_config;
   *cc_req->mutable_new_config() = new_config;
-
-  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
-  // transactions. See KUDU-798.
-  // Note: This timestamp has no meaning from a serialization perspective
-  // because this method is not executed on the TabletPeer's prepare thread.
-  cc_replicate->set_timestamp(clock_->Now().ToUint64());
+  CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate))));
@@ -1909,6 +1910,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     }
   }
 
+  // The vote was granted, become leader.
   ReplicaState::UniqueLock lock;
   Status s = state_->LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 3a67087..7777719 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -30,6 +30,7 @@
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/raft_consensus_state.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/failure_detector.h"
 
@@ -56,6 +57,7 @@ class ConsensusMetadata;
 class Peer;
 class PeerProxyFactory;
 class PeerManager;
+class TimeManager;
 struct ElectionResult;
 
 class RaftConsensus : public Consensus,
@@ -66,7 +68,7 @@ class RaftConsensus : public Consensus,
     std::unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
-    const scoped_refptr<server::Clock>& clock,
+    scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const std::shared_ptr<rpc::Messenger>& messenger,
     const scoped_refptr<log::Log>& log,
@@ -81,7 +83,7 @@ class RaftConsensus : public Consensus,
                 gscoped_ptr<ThreadPool> thread_pool,
                 const scoped_refptr<MetricEntity>& metric_entity,
                 const std::string& peer_uuid,
-                const scoped_refptr<server::Clock>& clock,
+                scoped_refptr<TimeManager> time_manager,
                 ReplicaTransactionFactory* txn_factory,
                 const scoped_refptr<log::Log>& log,
                 std::shared_ptr<MemTracker> parent_mem_tracker,
@@ -131,6 +133,8 @@ class RaftConsensus : public Consensus,
 
   std::string tablet_id() const override;
 
+  scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
+
   ConsensusStatePB ConsensusState(ConsensusConfigType type) const override;
 
   RaftConfigPB CommittedConfig() const override;
@@ -441,7 +445,7 @@ class RaftConsensus : public Consensus,
   gscoped_ptr<ThreadPool> thread_pool_;
 
   scoped_refptr<log::Log> log_;
-  scoped_refptr<server::Clock> clock_;
+  scoped_refptr<TimeManager> time_manager_;
   gscoped_ptr<PeerProxyFactory> peer_proxy_factory_;
 
   // When we receive a message from a remote peer telling us to start a transaction, we use

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 433ef6e..79a7eb6 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -151,8 +151,11 @@ class RaftConsensusQuorumTest : public KuduTest {
 
       RaftPeerPB local_peer_pb;
       CHECK_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb));
+
+      scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
       gscoped_ptr<PeerMessageQueue> queue(new PeerMessageQueue(metric_entity_,
                                                                logs_[i],
+                                                               time_manager,
                                                                local_peer_pb,
                                                                kTestTablet));
 
@@ -177,7 +180,7 @@ class RaftConsensusQuorumTest : public KuduTest {
                             std::move(thread_pool),
                             metric_entity_,
                             config_.peers(i).permanent_uuid(),
-                            clock_,
+                            time_manager,
                             txn_factory,
                             logs_[i],
                             parent_mem_trackers_[i],

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index af5943b..2c3e973 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -21,6 +21,7 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus_state.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -351,10 +352,11 @@ string ReplicaState::ToStringUnlocked() const {
 // TODO(todd): move to its own file
 //------------------------------------------------------------
 
-PendingRounds::PendingRounds(string log_prefix)
+PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager> time_manager)
     : log_prefix_(std::move(log_prefix)),
-      last_committed_op_id_(MinimumOpId()) {
-}
+      last_committed_op_id_(MinimumOpId()),
+      time_manager_(std::move(time_manager)) {}
+
 PendingRounds::~PendingRounds() {
 }
 
@@ -485,6 +487,7 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
 
     pending_txns_.erase(iter++);
     last_committed_op_id_ = round->id();
+    time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
     round->NotifyReplicationFinished(Status::OK());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index db8af70..ead377f 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -44,6 +44,7 @@ class Messenger;
 }
 
 namespace consensus {
+class TimeManager;
 
 // Class that coordinates access to the persistent Raft state (independently of Role).
 // This has a 1-1 relationship with RaftConsensus and is essentially responsible for
@@ -252,7 +253,7 @@ class ReplicaState {
 // We should consolidate to "round".
 class PendingRounds {
  public:
-  explicit PendingRounds(std::string log_prefix);
+  PendingRounds(std::string log_prefix, scoped_refptr<TimeManager> time_manager);
   ~PendingRounds();
 
   // Set the committed op during startup. This should be done after
@@ -319,6 +320,8 @@ class PendingRounds {
 
   // The OpId of the round that was last committed. Initialized to MinimumOpId().
   OpId last_committed_op_id_;
+
+  scoped_refptr<TimeManager> time_manager_;
 };
 
 }  // namespace consensus

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/time_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.cc b/src/kudu/consensus/time_manager.cc
index 39d627c..27c6791 100644
--- a/src/kudu/consensus/time_manager.cc
+++ b/src/kudu/consensus/time_manager.cc
@@ -24,10 +24,24 @@
 #include "kudu/util/flag_tags.h"
 
 DEFINE_bool(safe_time_advancement_without_writes, true,
-            "Whether to enable the advancement of \"safe\" time in "
-            "the absense of write operations");
+            "Whether to enable the advancement of \"safe\" time in the absense of write "
+            "operations");
 TAG_FLAG(safe_time_advancement_without_writes, advanced);
 
+DEFINE_double(missed_heartbeats_before_rejecting_snapshot_scans, 1.5,
+              "The maximum raft heartbeat periods since the tablet has seen safe time advanced "
+              "before refusing scans at snapshots that aren't yet safe and forcing clients to "
+              "try again.");
+TAG_FLAG(missed_heartbeats_before_rejecting_snapshot_scans, experimental);
+
+DEFINE_int32(safe_time_max_lag_ms, 30 * 1000,
+             "The maximum amount of time we allow safe time to lag behind the requested timestamp"
+             "before forcing the client to retry, in milliseconds.");
+TAG_FLAG(safe_time_max_lag_ms, experimental);
+
+DECLARE_int32(raft_heartbeat_interval_ms);
+DECLARE_int32(scanner_max_wait_ms);
+
 namespace kudu {
 namespace consensus {
 
@@ -50,6 +64,7 @@ ExternalConsistencyMode TimeManager::GetMessageConsistencyMode(const ReplicateMs
 TimeManager::TimeManager(scoped_refptr<Clock> clock, Timestamp initial_safe_time)
   : last_serial_ts_assigned_(initial_safe_time),
     last_safe_ts_(initial_safe_time),
+    last_advanced_safe_time_(MonoTime::Now()),
     mode_(NON_LEADER),
     clock_(std::move(clock)) {}
 
@@ -67,7 +82,9 @@ void TimeManager::SetNonLeaderMode() {
 Status TimeManager::AssignTimestamp(ReplicateMsg* message) {
   Lock l(lock_);
   if (PREDICT_FALSE(mode_ == NON_LEADER)) {
-    return Status::IllegalState("Cannot assign timestamp. TimeManager is not in Leader mode.");
+    return Status::IllegalState(Substitute("Cannot assign timestamp to transaction. Tablet is not "
+                                           "in leader mode. Last heard from a leader: $0 secs ago.",
+                                           last_advanced_safe_time_.ToString()));
   }
   Timestamp t;
   switch (GetMessageConsistencyMode(*message)) {
@@ -122,7 +139,69 @@ void TimeManager::AdvanceSafeTime(Timestamp safe_time) {
   AdvanceSafeTimeAndWakeUpWaitersUnlocked(safe_time);
 }
 
+bool TimeManager::HasAdvancedSafeTimeRecentlyUnlocked(string* error_message) {
+  MonoDelta time_since_last_advance = MonoTime::Now() - last_advanced_safe_time_;
+  int64_t max_last_advanced = FLAGS_missed_heartbeats_before_rejecting_snapshot_scans *
+      FLAGS_raft_heartbeat_interval_ms;
+  // Clamp max_last_advanced to 100 ms. Some tests set leader election timeouts really
+  // low and don't necessarily want to stress scanners.
+  max_last_advanced = std::max<int64_t>(max_last_advanced, 100LL);
+  MonoDelta max_delta = MonoDelta::FromMilliseconds(max_last_advanced);
+  if (time_since_last_advance > max_delta) {
+    *error_message = Substitute("Tablet hasn't heard from leader, or there hasn't been a stable "
+                                "leader for: $0 secs, (max is $1):",
+                                time_since_last_advance.ToString(), max_delta.ToString());
+    return false;
+  }
+  return true;
+}
+
+bool TimeManager::IsSafeTimeLaggingUnlocked(Timestamp timestamp, string* error_message) {
+  // Can't calculate safe time lag for the logical clock.
+  if (PREDICT_FALSE(!clock_->HasPhysicalComponent())) return false;
+  MonoDelta safe_time_diff = clock_->GetPhysicalComponentDifference(timestamp,
+                                                                    last_safe_ts_);
+  if (safe_time_diff.ToMilliseconds() > FLAGS_safe_time_max_lag_ms) {
+    *error_message = Substitute("Tablet is lagging too much to be able to serve snapshot scan. "
+                                "Lagging by: $0 ms, (max is $1 ms):",
+                                safe_time_diff.ToMilliseconds(),
+                                FLAGS_safe_time_max_lag_ms);
+    return true;
+  }
+  return false;
+}
+
+void TimeManager::MakeWaiterTimeoutMessageUnlocked(Timestamp timestamp, string* error_message) {
+  string mode = mode_ == LEADER ? "LEADER" : "NON-LEADER";
+  string clock_diff = clock_->HasPhysicalComponent() ? clock_->GetPhysicalComponentDifference(
+      timestamp, last_safe_ts_).ToString() : "None (Logical clock)";
+  *error_message = Substitute("Timed out waiting for ts: $0 to be safe (mode: $1). Current safe "
+                              "time: $2 Physical time difference: $3", clock_->Stringify(timestamp),
+                              mode, clock_->Stringify(last_safe_ts_), clock_diff);
+}
+
 Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline) {
+  string error_message;
+
+  // Pre-flight checks:
+  // - If this timestamp is before the last safe time return.
+  // - If we're not the leader make sure we've heard from the leader recently.
+  // - If we're not the leader make sure safe time isn't lagging too much.
+  {
+    Lock l(lock_);
+    if (timestamp < GetSafeTimeUnlocked()) return Status::OK();
+
+    if (mode_ == NON_LEADER) {
+      if (IsSafeTimeLaggingUnlocked(timestamp, &error_message)) {
+        return Status::TimedOut(error_message);
+      }
+
+      if (!HasAdvancedSafeTimeRecentlyUnlocked(&error_message)) {
+        return Status::TimedOut(error_message);
+      }
+    }
+  }
+
   // First wait for the clock to be past 'timestamp'.
   RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
 
@@ -152,15 +231,9 @@ Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline)
     if (waiter.latch->count() == 0) return Status::OK();
 
     waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiter));
-    return Status::TimedOut(Substitute(
-        "Timed out waiting for ts: $0 to be safe (mode: $1). "
-        "Current safe time: $2 Physical time difference: $3",
-        clock_->Stringify(waiter.timestamp),
-        (mode_ == LEADER ? "LEADER" : "NON-LEADER"),
-        clock_->Stringify(last_safe_ts_),
-        (clock_->HasPhysicalComponent() ?
-         clock_->GetPhysicalComponentDifference(timestamp, last_safe_ts_).ToString() :
-         "None (Logical clock)")));
+
+    MakeWaiterTimeoutMessageUnlocked(waiter.timestamp, &error_message);
+    return Status::TimedOut(error_message);
   }
 }
 
@@ -169,6 +242,7 @@ void TimeManager::AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time) {
     return;
   }
   last_safe_ts_ = safe_time;
+  last_advanced_safe_time_ = MonoTime::Now();
 
   if (PREDICT_FALSE(!waiters_.empty())) {
     auto iter = waiters_.begin();
@@ -216,6 +290,7 @@ Timestamp TimeManager::GetSafeTimeUnlocked() {
       // leader will never assign a new timestamp lower than it.
       if (PREDICT_TRUE(last_serial_ts_assigned_ <= last_safe_ts_)) {
         last_safe_ts_ = clock_->Now();
+        last_advanced_safe_time_ = MonoTime::Now();
         return last_safe_ts_;
       }
       // If the current state is b), then there might be transaction with a timestamp that is lower

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/consensus/time_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h
index c8568d5..980e4ad 100644
--- a/src/kudu/consensus/time_manager.h
+++ b/src/kudu/consensus/time_manager.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <algorithm>
 #include <string>
 #include <vector>
 
@@ -113,6 +114,7 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   //
   // Returns Status::OK() if it safe time advanced past 'timestamp' before 'deadline'
   // Returns Status::TimeOut() if deadline elapsed without safe time moving enough.
+  // Returns Status::ServiceUnavailable() is the request should be retried somewhere else.
   //
   // TODO(KUDU-1127) make this return another status if safe time is too far back in the past
   // or hasn't moved in a long time.
@@ -128,6 +130,20 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   FRIEND_TEST(TimeManagerTest, TestTimeManagerNonLeaderMode);
   FRIEND_TEST(TimeManagerTest, TestTimeManagerLeaderMode);
 
+  // Returns whether we've advanced safe time recently.
+  // If this returns false we might be partitioned or there might be election churn.
+  // The client should try again.
+  // If this returns false, sets error information in 'error_message'.
+  bool HasAdvancedSafeTimeRecentlyUnlocked(std::string* error_message);
+
+  // Returns whether safe time is lagging too much behind 'timestamp' and the client
+  // should be forced to retry.
+  // If this returns true, sets error information in 'error_message'.
+  bool IsSafeTimeLaggingUnlocked(Timestamp timestamp, std::string* error_message);
+
+  // Helper to build the final error message of WaitUntilSafe().
+  void MakeWaiterTimeoutMessageUnlocked(Timestamp timestamp, std::string* error_message);
+
   // Helper to return the external consistency mode of 'message'.
   static ExternalConsistencyMode GetMessageConsistencyMode(const ReplicateMsg& message);
 
@@ -141,7 +157,7 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   struct WaitingState {
     // The timestamp the waiter requires be safe.
     Timestamp timestamp;
-    // Latch that will be count down once 'timestamp' is safe, unblocking the waiter.
+    // Latch that will be count down once 'timestamp' if safe, unblocking the waiter.
     CountDownLatch* latch;
   };
 
@@ -177,6 +193,10 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
   // the last serial timestamp appended to the queue.
   Timestamp last_safe_ts_;
 
+  // The last time we advanced safe time.
+  // Used in the decision of whether we should have waiters wait or try again.
+  MonoTime last_advanced_safe_time_;
+
   // The current mode of the TimeManager.
   Mode mode_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 5e8757a..4b901dd 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -854,7 +854,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterUpdatingRemovedColumn) {
             "---------------------------\n"
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
-            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c2=12345); Undo Mutations: [@3(DELETE)]; "
+            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c2=12345); Undo Mutations: [@4(DELETE)]; "
                 "Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 
@@ -903,9 +903,9 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasIntoMissingBaseData) {
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
             "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0, int32 c2=54321); Undo Mutations: "
-                "[@5(SET c2=12345), @2(DELETE)]; Redo Mutations: [];\n"
+                "[@6(SET c2=12345), @3(DELETE)]; Redo Mutations: [];\n"
             "RowIdxInBlock: 1; Base: (int32 c0=16777216, int32 c1=1, int32 c2=12345); "
-                "Undo Mutations: [@3(DELETE)]; Redo Mutations: [];",
+                "Undo Mutations: [@4(DELETE)]; Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 }
 
@@ -958,7 +958,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterAddUpdateRemoveColumn) {
             "---------------------------\n"
             "MRS memrowset:\n"
             "RowSet RowSet(0):\n"
-            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0); Undo Mutations: [@2(DELETE)]; "
+            "RowIdxInBlock: 0; Base: (int32 c0=0, int32 c1=0); Undo Mutations: [@3(DELETE)]; "
                 "Redo Mutations: [];",
             JoinStrings(rows, "\n"));
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index d027ba5..1b38951 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -125,7 +125,8 @@ Status ClusterVerifier::DoCheckRowCount(const std::string& table_name,
   client::KuduScanner scanner(table.get());
   CHECK_OK(scanner.SetReadMode(client::KuduScanner::READ_AT_SNAPSHOT));
   CHECK_OK(scanner.SetFaultTolerant());
-  CHECK_OK(scanner.SetTimeoutMillis(5000));
+  // Allow a long scan timeout for verification.
+  CHECK_OK(scanner.SetTimeoutMillis(60 * 1000));
   CHECK_OK(scanner.SetProjectedColumns({}));
   RETURN_NOT_OK_PREPEND(scanner.Open(), "Unable to open scanner");
   int count = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/linked_list-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index c37c4ed..9e6ac01 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -51,7 +51,8 @@ static const char* const kKeyColumnName = "rand_key";
 static const char* const kLinkColumnName = "link_to";
 static const char* const kInsertTsColumnName = "insert_ts";
 static const char* const kUpdatedColumnName = "updated";
-static const int64_t kNoSnapshot = -1;
+static const int64_t kNoSnapshot = -2;
+static const int64_t kSnapshotAtNow = -1;
 static const int64_t kNoParticularCountExpected = -1;
 
 // Vector of snapshot timestamp, count pairs.
@@ -83,6 +84,15 @@ class LinkedListTester {
     CHECK_OK(b.Build(&schema_));
   }
 
+  // The modes for WaitAndVerify
+  enum WaitAndVerifyMode {
+    // Perform snapshots scans in the past but finish with a "READ_LATEST" scan.
+    // This should be used when the a majority of the cluster is down.
+    FINISH_WITH_SCAN_LATEST,
+    // Perform the snapshot scans in the past and finish with a snapshot scan in the present.
+    FINISH_WITH_SNAPSHOT_SCAN
+  };
+
   // Create the table.
   Status CreateLinkedListTable();
 
@@ -108,14 +118,26 @@ class LinkedListTester {
                                   verified_count);
   }
 
-  // Variant of VerifyLinkedListRemote that verifies without specifying a snapshot timestamp.
-  Status VerifyLinkedListNoSnapshotRemote(const int64_t expected,
-                                          const bool log_errors,
-                                          int64_t* verified_count) {
+  Status VerifyLinkedListAtLatestRemote(const int64_t expected,
+                                        const bool log_errors,
+                                        const boost::function<Status(const std::string&)>& cb,
+                                        int64_t* verified_count) {
     return VerifyLinkedListRemote(kNoSnapshot,
                                   expected,
                                   log_errors,
-                                  boost::bind(&LinkedListTester::ReturnOk, this, _1),
+                                  cb,
+                                  verified_count);
+  }
+
+  // Variant of VerifyLinkedListremote that verifies at 'now'.
+  Status VerifyLinkedListAtNowSnapshotRemote(const int64_t expected,
+                                             const bool log_errors,
+                                             const boost::function<Status(const std::string&)>& cb,
+                                             int64_t* verified_count) {
+    return VerifyLinkedListRemote(kSnapshotAtNow,
+                                  expected,
+                                  log_errors,
+                                  cb,
                                   verified_count);
   }
 
@@ -135,16 +157,19 @@ class LinkedListTester {
   // A variant of VerifyLinkedListRemote that is more robust towards ongoing
   // bootstrapping and replication.
   Status WaitAndVerify(int seconds_to_run,
-                       int64_t expected) {
+                       int64_t expected,
+                       WaitAndVerifyMode mode = FINISH_WITH_SNAPSHOT_SCAN) {
     return WaitAndVerify(seconds_to_run,
                          expected,
-                         boost::bind(&LinkedListTester::ReturnOk, this, _1));
+                         boost::bind(&LinkedListTester::ReturnOk, this, _1),
+                         mode);
   }
 
   // A variant of WaitAndVerify that also takes a callback to be run once during verification.
   Status WaitAndVerify(int seconds_to_run,
                        int64_t expected,
-                       const boost::function<Status(const std::string&)>& cb);
+                       const boost::function<Status(const std::string&)>& cb,
+                       WaitAndVerifyMode mode = FINISH_WITH_SNAPSHOT_SCAN);
 
   // Generates a vector of keys for the table such that each tablet is
   // responsible for an equal fraction of the int64 key space.
@@ -439,14 +464,6 @@ Status LinkedListTester::LoadLinkedList(
   RETURN_NOT_OK_PREPEND(client_->OpenTable(table_name_, &table),
                         "Could not open table " + table_name_);
 
-  // Instantiate a hybrid clock so that we can collect timestamps since we're running the
-  // tablet servers in an external mini cluster.
-  // TODO when they become available (KUDU-420), use client-propagated timestamps
-  // instead of reading from the clock directly. This will allow to run this test
-  // against a "real" cluster and not force the client to be synchronized.
-  scoped_refptr<server::Clock> ht_clock(new server::HybridClock());
-  RETURN_NOT_OK(ht_clock->Init());
-
   MonoTime start = MonoTime::Now();
   MonoTime deadline = start + run_for;
 
@@ -477,9 +494,9 @@ Status LinkedListTester::LoadLinkedList(
 
     MonoTime now = MonoTime::Now();
     if (next_sample < now) {
-      Timestamp now = ht_clock->Now();
+      Timestamp now(client_->GetLatestObservedTimestamp());
       sampled_timestamps_and_counts_.push_back(
-          pair<uint64_t,int64_t>(now.ToUint64(), *written_count));
+          pair<uint64_t,int64_t>(now.ToUint64() + 1, *written_count));
       next_sample += sample_interval;
       LOG(INFO) << "Sample at HT timestamp: " << now.ToString()
                 << " Inserted count: " << *written_count;
@@ -566,8 +583,8 @@ Status LinkedListTester::VerifyLinkedListRemote(
   RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
 
   string snapshot_str;
-  if (snapshot_timestamp == kNoSnapshot) {
-    snapshot_str = "LATEST";
+  if (snapshot_timestamp == kSnapshotAtNow) {
+    snapshot_str = "NOW";
   } else {
     snapshot_str = server::HybridClock::StringifyTimestamp(Timestamp(snapshot_timestamp));
   }
@@ -575,11 +592,16 @@ Status LinkedListTester::VerifyLinkedListRemote(
   client::KuduScanner scanner(table.get());
   RETURN_NOT_OK_PREPEND(scanner.SetProjectedColumns(verify_projection_), "Bad projection");
   RETURN_NOT_OK(scanner.SetBatchSizeBytes(0)); // Force at least one NextBatch RPC.
+  RETURN_NOT_OK(scanner.SetTimeoutMillis(20 * 1000 /* 20 seconds */));
 
   if (snapshot_timestamp != kNoSnapshot) {
     RETURN_NOT_OK(scanner.SetReadMode(client::KuduScanner::READ_AT_SNAPSHOT));
     RETURN_NOT_OK(scanner.SetFaultTolerant());
-    RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+    if (snapshot_timestamp != kSnapshotAtNow) {
+      RETURN_NOT_OK(scanner.SetSnapshotRaw(snapshot_timestamp));
+    }
+  } else {
+    RETURN_NOT_OK(scanner.SetReadMode(client::KuduScanner::READ_LATEST));
   }
 
   LOG(INFO) << "Verifying Snapshot: " << snapshot_str << " Expected Rows: " << expected;
@@ -597,7 +619,7 @@ Status LinkedListTester::VerifyLinkedListRemote(
   while (scanner.HasMoreRows()) {
     // If we're doing a snapshot scan with a big enough cluster, call the callback on the scanner's
     // tserver. Do this only once.
-    if (snapshot_timestamp != kNoSnapshot && !cb_called) {
+    if (snapshot_timestamp != kSnapshotAtNow && !cb_called) {
       client::KuduTabletServer* kts_ptr;
       scanner.GetCurrentServer(&kts_ptr);
       gscoped_ptr<client::KuduTabletServer> kts(kts_ptr);
@@ -618,7 +640,7 @@ Status LinkedListTester::VerifyLinkedListRemote(
       // for snapshot reads as updates are performed by their own thread. This means
       // that there is no guarantee that, for any snapshot timestamp that comes before
       // all writes are completed, all rows will be updated.
-      if (snapshot_timestamp == kNoSnapshot) {
+      if (snapshot_timestamp == kSnapshotAtNow) {
         RETURN_NOT_OK(row.GetBool(2, &updated));
       } else {
         updated = enable_mutation_;
@@ -672,7 +694,8 @@ Status LinkedListTester::VerifyLinkedListLocal(const tablet::Tablet* tablet,
 
 Status LinkedListTester::WaitAndVerify(int seconds_to_run,
                                        int64_t expected,
-                                       const boost::function<Status(const std::string&)>& cb) {
+                                       const boost::function<Status(const std::string&)>& cb,
+                                       WaitAndVerifyMode mode) {
 
   std::list<pair<int64_t, int64_t> > samples_as_list(sampled_timestamps_and_counts_.begin(),
                                                      sampled_timestamps_and_counts_.end());
@@ -726,28 +749,37 @@ Status LinkedListTester::WaitAndVerify(int seconds_to_run,
       // even if a later snapshot or the final verification failed.
       iter = samples_as_list.erase(iter);
     }
-    if (s.ok()) {
-      s = VerifyLinkedListNoSnapshotRemote(expected, last_attempt, &seen);
-    }
 
-    // TODO: when we enable hybridtime consistency for the scans,
-    // then we should not allow !s.ok() here. But, with READ_LATEST
-    // scans, we could have a lagging replica of one tablet, with an
-    // up-to-date replica of another tablet, and end up with broken links
-    // in the chain.
-
-    if (!s.ok()) {
-      LOG(INFO) << "Table not yet ready: " << seen << "/" << expected << " rows"
-                << " (status: " << s.ToString() << ")";
-      if (last_attempt) {
-        // We'll give it an equal amount of time to re-load the data as it took
-        // to write it in. Typically it completes much faster than that.
-        return Status::TimedOut("Timed out waiting for table to be accessible again",
-                                s.ToString());
-      }
+    // Perform the last scan with the required mode.
+    switch (mode) {
+      case FINISH_WITH_SNAPSHOT_SCAN:
+        if (s.ok()) {
+          RETURN_NOT_OK(VerifyLinkedListAtNowSnapshotRemote(
+              expected, last_attempt, boost::bind(&LinkedListTester::ReturnOk, this, _1), &seen));
+        }
+        break;
+      case FINISH_WITH_SCAN_LATEST:
+        // Scans in READ_LATEST mode will, by design, likely return a stale view of the tablet
+        // so, in this case, retry.
+        if (s.ok()) {
+          s = VerifyLinkedListAtLatestRemote(
+              expected, last_attempt, boost::bind(&LinkedListTester::ReturnOk, this, _1), &seen);
+        }
 
-      // Sleep and retry until timeout.
-      SleepFor(MonoDelta::FromMilliseconds(20));
+        if (!s.ok()) {
+          LOG(INFO) << "Table not yet ready: " << seen << "/" << expected << " rows"
+                    << " (status: " << s.ToString() << ")";
+          if (last_attempt) {
+            // We'll give it an equal amount of time to re-load the data as it took
+            // to write it in. Typically it completes much faster than that.
+            return Status::TimedOut("Timed out waiting for table to be accessible again",
+                                    s.ToString());
+          }
+
+          // Sleep and retry until timeout.
+          SleepFor(MonoDelta::FromMilliseconds(20));
+        }
+        break;
     }
   } while (!s.ok());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/linked_list-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/linked_list-test.cc b/src/kudu/integration-tests/linked_list-test.cc
index 04092a1..a41846d 100644
--- a/src/kudu/integration-tests/linked_list-test.cc
+++ b/src/kudu/integration-tests/linked_list-test.cc
@@ -271,6 +271,7 @@ TEST_F(LinkedListTest, TestLoadWhileOneServerDownAndVerify) {
 
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_tablets = 1;
+
   ASSERT_NO_FATAL_FAILURE(BuildAndStart());
 
   // Load the data with one of the three servers down.
@@ -298,7 +299,10 @@ TEST_F(LinkedListTest, TestLoadWhileOneServerDownAndVerify) {
 
   cluster_->tablet_server(1)->Shutdown();
   cluster_->tablet_server(2)->Shutdown();
-  ASSERT_OK(tester_->WaitAndVerify(FLAGS_seconds_to_run, written));
+
+  ASSERT_OK(tester_->WaitAndVerify(FLAGS_seconds_to_run,
+                                   written,
+                                   LinkedListTester::FINISH_WITH_SCAN_LATEST));
 }
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 995a9c4..cf83164 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -874,6 +874,7 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_
   // can verify an exact number of rows in the end, thanks to exactly once semantics.
   workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
   workload->set_num_write_threads(10);
+  workload->set_num_read_threads(2);
   workload->Setup();
   workload->Start();
 
@@ -960,6 +961,7 @@ void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
   ts_flags.push_back("--leader_failure_monitor_check_mean_ms=1");
   ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=1");
   ts_flags.push_back("--never_fsync");
+
   ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
 
   CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
@@ -1006,6 +1008,7 @@ TEST_F(RaftConsensusITest, TestChurnyElections) {
   CreateClusterForChurnyElectionsTests({});
   TestWorkload workload(cluster_.get());
   workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
   DoTestChurnyElections(&workload, kNumWrites);
 }
 
@@ -1017,6 +1020,7 @@ TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
   const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
   TestWorkload workload(cluster_.get());
   workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
   DoTestChurnyElections(&workload, kNumWrites);
 }
 
@@ -1250,6 +1254,8 @@ void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
   FLAGS_num_replicas = 3;
   FLAGS_num_tablet_servers = 3;
   vector<string> ts_flags, master_flags;
+  // Don't use the hybrid clock as we set logical timestamps on ops.
+  ts_flags.push_back("--use_hybrid_clock=false");
   ts_flags.push_back("--enable_leader_failure_detection=false");
   master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
   BuildAndStart(ts_flags, master_flags);
@@ -1520,7 +1526,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ScanRequestPB req;
     ScanResponsePB resp;
     RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromMilliseconds(5000));
+    rpc.set_timeout(MonoDelta::FromMilliseconds(100));
     NewScanRequestPB* scan = req.mutable_new_scan_request();
     scan->set_tablet_id(tablet_id_);
     scan->set_read_mode(READ_AT_SNAPSHOT);
@@ -1533,8 +1539,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ASSERT_OK(replica_ts->tserver_proxy->Scan(req, &resp, &rpc));
     SCOPED_TRACE(resp.DebugString());
     string err_str = StatusFromPB(resp.error().status()).ToString();
-    ASSERT_STR_CONTAINS(err_str, "Timed out waiting for all transactions");
-    ASSERT_STR_CONTAINS(err_str, "to commit");
+    ASSERT_STR_CONTAINS(err_str, "Timed out waiting for ts:");
+    ASSERT_STR_CONTAINS(err_str, "to be safe");
   }
 
   resp.Clear();
@@ -2378,6 +2384,7 @@ TEST_F(RaftConsensusITest, TestAutoCreateReplica) {
   workload.set_table_name(kTableId);
   workload.set_num_replicas(FLAGS_num_replicas);
   workload.set_num_write_threads(10);
+  workload.set_num_read_threads(2);
   workload.set_write_batch_size(100);
   workload.Setup();
 
@@ -2502,6 +2509,7 @@ TEST_F(RaftConsensusITest, TestSlowLeader) {
 
   TestWorkload workload(cluster_.get());
   workload.set_table_name(kTableId);
+  workload.set_num_read_threads(2);
   workload.Setup();
   workload.Start();
   SleepFor(MonoDelta::FromSeconds(60));
@@ -2652,6 +2660,7 @@ TEST_F(RaftConsensusITest, TestSlowFollower) {
 
   TestWorkload workload(cluster_.get());
   workload.set_table_name(kTableId);
+  workload.set_num_read_threads(2);
   workload.Setup();
   workload.Start();
   SleepFor(MonoDelta::FromSeconds(60));

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 0bce2c3..66753db 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -55,6 +55,8 @@ DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(tablet_history_max_age_sec);
 DECLARE_string(block_manager);
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_double(missed_heartbeats_before_rejecting_snapshot_scans);
+DECLARE_int32(safe_time_max_lag_ms);
 
 DEFINE_int32(test_num_rounds, 200, "Number of rounds to loop "
                                    "RandomizedTabletHistoryGcITest.TestRandomHistoryGCWorkload");
@@ -112,6 +114,11 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_mock_wall_clock = true;
     FLAGS_tablet_history_max_age_sec = 100;
+
+    // Set these really high since we're using the mock clock.
+    // This allows the TimeManager to still work.
+    FLAGS_safe_time_max_lag_ms = 30 * 1000 * 1000;
+    FLAGS_missed_heartbeats_before_rejecting_snapshot_scans = 100.0;
   }
 
  protected:
@@ -661,7 +668,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
 
         unique_ptr<client::KuduScanner> scanner(new KuduScanner(table.get()));
         ASSERT_OK(scanner->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
-        ASSERT_OK(scanner->SetOrderMode(KuduScanner::ORDERED));
+        ASSERT_OK(scanner->SetFaultTolerant());
+        ASSERT_OK(scanner->SetTimeoutMillis(60 * 1000));
         ASSERT_OK(scanner->SetSnapshotRaw(snapshot_ts.ToUint64()));
         ASSERT_OK(scanner->Open());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 6989223..b674b17 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -33,6 +33,8 @@
 namespace kudu {
 
 using client::KuduInsert;
+using client::KuduScanBatch;
+using client::KuduScanner;
 using client::KuduSchema;
 using client::KuduSchemaFromSchema;
 using client::KuduSession;
@@ -48,6 +50,7 @@ TestWorkload::TestWorkload(MiniClusterBase* cluster)
     rng_(SeedRandom()),
     payload_bytes_(11),
     num_write_threads_(4),
+    num_read_threads_(0),
     write_batch_size_(50),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
@@ -69,13 +72,12 @@ TestWorkload::~TestWorkload() {
   StopAndJoin();
 }
 
-void TestWorkload::WriteThread() {
-  shared_ptr<KuduTable> table;
+void TestWorkload::OpenTable(shared_ptr<KuduTable>* table) {
   // Loop trying to open up the table. In some tests we set up very
   // low RPC timeouts to test those behaviors, so this might fail and
   // need retrying.
   while (should_run_.Load()) {
-    Status s = client_->OpenTable(table_name_, &table);
+    Status s = client_->OpenTable(table_name_, table);
     if (s.ok()) {
       break;
     }
@@ -86,20 +88,26 @@ void TestWorkload::WriteThread() {
     CHECK_OK(s);
   }
 
-  shared_ptr<KuduSession> session = client_->NewSession();
-  session->SetTimeoutMillis(write_timeout_millis_);
-  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-
   // Wait for all of the workload threads to be ready to go. This maximizes the chance
   // that they all send a flood of requests at exactly the same time.
   //
   // This also minimizes the chance that we see failures to call OpenTable() if
-  // a late-starting thread overlaps with the flood of outbound traffic from the
-  // ones that are already writing data.
+  // a late-starting thread overlaps with the flood of traffic from the ones that are
+  // already writing/reading data.
   start_latch_.CountDown();
   start_latch_.Wait();
+}
+
+void TestWorkload::WriteThread() {
+  shared_ptr<KuduTable> table;
+  OpenTable(&table);
+
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(write_timeout_millis_);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
 
   while (should_run_.Load()) {
+    int inserted = 0;
     for (int i = 0; i < write_batch_size_; i++) {
       if (write_pattern_ == UPDATE_ONE_ROW) {
         gscoped_ptr<KuduUpdate> update(table->NewUpdate());
@@ -128,11 +136,10 @@ void TestWorkload::WriteThread() {
         }
         CHECK_OK(row->SetStringCopy(2, test_payload));
         CHECK_OK(session->Apply(insert.release()));
+        inserted++;
       }
     }
 
-    int inserted = write_batch_size_;
-
     Status s = session->Flush();
 
     if (PREDICT_FALSE(!s.ok())) {
@@ -158,18 +165,46 @@ void TestWorkload::WriteThread() {
           continue;
         }
 
-        CHECK(e->status().ok()) << "Unexpected status: " << e->status().ToString();
+        CHECK_OK(s);
       }
       inserted -= errors.size();
     }
 
-    rows_inserted_.IncrementBy(inserted);
     if (inserted > 0) {
+      rows_inserted_.IncrementBy(inserted);
       batches_completed_.Increment();
     }
   }
 }
 
+void TestWorkload::ReadThread() {
+  shared_ptr<KuduTable> table;
+  OpenTable(&table);
+
+  while (should_run_.Load()) {
+    // Slow the scanners down to avoid imposing too much stress on already stressful tests.
+    SleepFor(MonoDelta::FromMilliseconds(150));
+
+    KuduScanner scanner(table.get());
+    // Set a high scanner timeout so that we're likely to have a chance to scan, even in
+    // high-stress workloads.
+    CHECK_OK(scanner.SetTimeoutMillis(60 * 1000 /* 60 seconds */));
+    CHECK_OK(scanner.SetFaultTolerant());
+
+    int64_t expected_row_count = rows_inserted_.Load();
+    size_t row_count = 0;
+
+    CHECK_OK(scanner.Open());
+    while (scanner.HasMoreRows()) {
+      KuduScanBatch batch;
+      CHECK_OK(scanner.NextBatch(&batch));
+      row_count += batch.NumRows();
+    }
+
+    CHECK_GE(row_count, expected_row_count);
+  }
+}
+
 void TestWorkload::Setup() {
   CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
 
@@ -225,13 +260,14 @@ void TestWorkload::Setup() {
     CHECK_OK(row->SetInt32(1, 0));
     CHECK_OK(row->SetStringCopy(2, "hello world"));
     CHECK_OK(session->Apply(insert.release()));
+    rows_inserted_.Store(1);
   }
 }
 
 void TestWorkload::Start() {
   CHECK(!should_run_.Load()) << "Already started";
   should_run_.Store(true);
-  start_latch_.Reset(num_write_threads_);
+  start_latch_.Reset(num_write_threads_ + num_read_threads_);
   for (int i = 0; i < num_write_threads_; i++) {
     scoped_refptr<kudu::Thread> new_thread;
     CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test-writer-$0", i),
@@ -239,6 +275,15 @@ void TestWorkload::Start() {
                                   &new_thread));
     threads_.push_back(new_thread);
   }
+  // Start the read threads. Order matters here, the read threads are last so that
+  // we'll have a chance to do some scans after all writers are done.
+  for (int i = 0; i < num_read_threads_; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("test-reader-$0", i),
+                                  &TestWorkload::ReadThread, this,
+                                  &new_thread));
+    threads_.push_back(new_thread);
+  }
 }
 
 void TestWorkload::StopAndJoin() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 7cd4ddf..fdab495 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -38,6 +38,11 @@ class Thread;
 // The actual data inserted is random, and thus can't be verified for
 // integrity. However, this is still useful in conjunction with ClusterVerifier
 // to verify that replicas do not diverge.
+//
+// The read workload essentially tests read-your-writes. It constantly
+// issues snapshot scans in the present and asserts that we see at least as
+// many rows as we have written, independently of which replica we choose
+// to scan.
 class TestWorkload {
  public:
   static const char* const kDefaultTableName;
@@ -53,6 +58,10 @@ class TestWorkload {
     num_write_threads_ = n;
   }
 
+  void set_num_read_threads(int n) {
+    num_read_threads_ = n;
+  }
+
   void set_write_batch_size(int s) {
     write_batch_size_ = s;
   }
@@ -170,7 +179,9 @@ class TestWorkload {
   }
 
  private:
+  void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
   void WriteThread();
+  void ReadThread();
 
   MiniClusterBase* cluster_;
   client::KuduClientBuilder client_builder_;
@@ -179,6 +190,7 @@ class TestWorkload {
 
   int payload_bytes_;
   int num_write_threads_;
+  int num_read_threads_;
   int write_batch_size_;
   int write_timeout_millis_;
   bool timeout_allowed_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index d24c108..543d806 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -43,7 +43,7 @@ class MvccTest : public KuduTest {
 
   void WaitForSnapshotAtTSThread(MvccManager* mgr, Timestamp ts) {
     MvccSnapshot s;
-    CHECK_OK(mgr->WaitForCleanSnapshotAtTimestamp(ts, &s, MonoTime::Max()));
+    CHECK_OK(mgr->WaitForSnapshotWithAllCommitted(ts, &s, MonoTime::Max()));
     CHECK(s.is_clean()) << "verifying postcondition";
     std::lock_guard<simple_spinlock> lock(lock_);
     result_snapshot_.reset(new MvccSnapshot(s));
@@ -378,6 +378,7 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
   mgr.StartTransaction(tx2);
   Timestamp tx3 = clock_->Now();
   mgr.StartTransaction(tx3);
+  mgr.AdjustSafeTime(clock_->Now());
 
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(1)));
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(2)));
@@ -409,14 +410,16 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
 
 TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithNoInflights) {
   MvccManager mgr(clock_.get());
-  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, clock_->Now());
+  Timestamp to_wait_for = clock_->Now();
+  mgr.AdjustSafeTime(clock_->Now());
+  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
 
   // join immediately.
   waiting_thread.join();
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
+TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
 
   MvccManager mgr(clock_.get());
 
@@ -425,8 +428,14 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
   Timestamp tx2 = clock_->Now();
   mgr.StartTransaction(tx2);
   mgr.AdjustSafeTime(tx2);
+  Timestamp to_wait_for = clock_->Now();
 
-  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, clock_->Now());
+  // Select a safe time that is after all transactions and after the the timestamp we'll wait for
+  // and adjust it on the MvccManager. This will cause "clean time" to move when tx1 and tx2 commit.
+  Timestamp safe_time = clock_->Now();
+  mgr.AdjustSafeTime(safe_time);
+
+  thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, to_wait_for);
 
   ASSERT_FALSE(HasResultSnapshot());
   mgr.StartApplyingTransaction(tx1);
@@ -438,9 +447,8 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithInFlights) {
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
+TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
   MvccManager mgr(clock_.get());
-
   Timestamp tx1 = clock_->Now();
   mgr.StartTransaction(tx1);
   Timestamp tx2 = clock_->Now();
@@ -480,7 +488,6 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
   mgr.StartTransaction(tx2);
   Timestamp tx3 = clock_->Now();
   mgr.StartTransaction(tx3);
-  mgr.AdjustSafeTime(tx3);
 
   // Start a thread waiting for transactions with ts <= 2 to commit
   thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, &mgr, tx2);
@@ -498,9 +505,13 @@ TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
   SleepFor(MonoDelta::FromMilliseconds(1));
   ASSERT_FALSE(HasResultSnapshot());
 
-  // Commit tx 2 - thread can now continue
+  // Commit tx 2 - thread should still wait.
   mgr.StartApplyingTransaction(tx2);
   mgr.CommitTransaction(tx2);
+  ASSERT_FALSE(HasResultSnapshot());
+
+  // Advance safe time, thread should continue.
+  mgr.AdjustSafeTime(tx3);
   waiting_thread.join();
   ASSERT_TRUE(HasResultSnapshot());
 }
@@ -636,7 +647,7 @@ TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
   // transaction isn't committed yet.
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(10);
   MvccSnapshot snap;
-  Status s = mgr.WaitForCleanSnapshotAtTimestamp(tx1, &snap, deadline);
+  Status s = mgr.WaitForSnapshotWithAllCommitted(tx1, &snap, deadline);
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index f6316e8..bfa8a79 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -34,6 +34,8 @@
 
 namespace kudu { namespace tablet {
 
+using strings::Substitute;
+
 MvccManager::MvccManager(const scoped_refptr<server::Clock>& clock)
   : safe_time_(Timestamp::kMin),
     earliest_in_flight_(Timestamp::kMax),
@@ -72,7 +74,7 @@ void MvccManager::StartApplyingTransaction(Timestamp timestamp) {
 
 bool MvccManager::InitTransactionUnlocked(const Timestamp& timestamp) {
   // Ensure that we didn't mark the given timestamp as "safe".
-  if (PREDICT_FALSE(safe_time_ >= timestamp)) {
+  if (PREDICT_FALSE(timestamp <= safe_time_)) {
     return false;
   }
 
@@ -164,10 +166,17 @@ void MvccManager::AdjustSafeTime(Timestamp safe_time) {
 
   // No more transactions will start with a ts that is lower than or equal
   // to 'safe_time', so we adjust the snapshot accordingly.
-  if (PREDICT_TRUE(safe_time_ < safe_time)) {
+  if (PREDICT_TRUE(safe_time_ <= safe_time)) {
     safe_time_ = safe_time;
   } else {
-    // If we couldn't adjust "safe" time don't bother adjusting "clean" time.
+    // TODO(dralves) This shouldn't happen, the safe time passed to MvccManager should be
+    // monotically increasing. If if does though, the impact is on scan snapshot correctness,
+    // not on corruption of state and some test-only code sets this back (LocalTabletWriter).
+    // Note that we will still crash if a transaction comes in with a timestamp that is lower
+    // than 'cur_snap_.all_committed_before_'.
+    LOG_EVERY_N(ERROR, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
+                                         "Current Snapshot: $2", safe_time_.ToString(),
+                                         safe_time.ToString(), cur_snap_.ToString());
     return;
   }
 
@@ -226,8 +235,7 @@ void MvccManager::AdjustCleanTime() {
   }
 }
 
-Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts,
-                              const MonoTime& deadline) const {
+Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts, const MonoTime& deadline) const {
   TRACE_EVENT2("tablet", "MvccManager::WaitUntil",
                "wait_for", wait_for == ALL_COMMITTED ? "all_committed" : "none_applying",
                "ts", ts.ToUint64())
@@ -256,10 +264,9 @@ Status MvccManager::WaitUntil(WaitFor wait_for, Timestamp ts,
   }
 
   waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiting_state));
-  return Status::TimedOut(strings::Substitute(
-      "Timed out waiting for all transactions with ts < $0 to $1",
-      clock_->Stringify(ts),
-      wait_for == ALL_COMMITTED ? "commit" : "finish applying"));
+  return Status::TimedOut(Substitute("Timed out waiting for all transactions with ts < $0 to $1",
+                                     ts.ToString(),
+                                     wait_for == ALL_COMMITTED ? "commit" : "finish applying"));
 }
 
 bool MvccManager::IsDoneWaitingUnlocked(const WaitingState& waiter) const {
@@ -273,16 +280,18 @@ bool MvccManager::IsDoneWaitingUnlocked(const WaitingState& waiter) const {
 }
 
 bool MvccManager::AreAllTransactionsCommittedUnlocked(Timestamp ts) const {
-  if (timestamps_in_flight_.empty()) {
-    // If nothing is in-flight, then check the clock. If the timestamp is in the past,
-    // we know that no new uncommitted transactions may start before this ts.
-    return ts <= clock_->Now();
-  }
-  // If some transactions are in flight, then check the in-flight list.
-  return !cur_snap_.MayHaveUncommittedTransactionsAtOrBefore(ts);
+  // If ts is before the 'all_committed_before_' watermark on the current snapshot then
+  // all transactions before it are committed.
+  if (ts < cur_snap_.all_committed_before_) return true;
+
+  // We might not have moved 'cur_snap_.all_committed_before_' (the clean time) but 'ts'
+  // might still come before any possible in-flights.
+  return ts < earliest_in_flight_;
 }
 
 bool MvccManager::AnyApplyingAtOrBeforeUnlocked(Timestamp ts) const {
+  // TODO(todd) this is not actually checking on the applying txns, it's checking on
+  // _all in-flight_. Is this a bug?
   for (const InFlightMap::value_type entry : timestamps_in_flight_) {
     if (entry.first <= ts.value()) {
       return true;
@@ -296,20 +305,16 @@ void MvccManager::TakeSnapshot(MvccSnapshot *snap) const {
   *snap = cur_snap_;
 }
 
-Status MvccManager::WaitForCleanSnapshotAtTimestamp(Timestamp timestamp,
-                                                    MvccSnapshot *snap,
+Status MvccManager::WaitForSnapshotWithAllCommitted(Timestamp timestamp,
+                                                    MvccSnapshot* snapshot,
                                                     const MonoTime& deadline) const {
-  TRACE_EVENT0("tablet", "MvccManager::WaitForCleanSnapshotAtTimestamp");
-  RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
+  TRACE_EVENT0("tablet", "MvccManager::WaitForSnapshotWithAllCommitted");
+
   RETURN_NOT_OK(WaitUntil(ALL_COMMITTED, timestamp, deadline));
-  *snap = MvccSnapshot(timestamp);
+  *snapshot = MvccSnapshot(timestamp);
   return Status::OK();
 }
 
-void MvccManager::WaitForCleanSnapshot(MvccSnapshot* snap) const {
-  CHECK_OK(WaitForCleanSnapshotAtTimestamp(clock_->Now(), snap, MonoTime::Max()));
-}
-
 void MvccManager::WaitForApplyingTransactionsToCommit() const {
   TRACE_EVENT0("tablet", "MvccManager::WaitForApplyingTransactionsToCommit");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 94f3789..bd106a8 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -218,6 +218,10 @@ class MvccManager {
   //
   // This must only be called when there is a guarantee that there won't be
   // any more transactions with timestamps equal to or lower than 'safe_time'.
+  //
+  // TODO(dralves) Until leader leases is implemented this should only be called
+  // with the timestamps of consensus committed transactions, not with the safe
+  // time received from the leader (which can go back without leader leases).
   void AdjustSafeTime(Timestamp safe_time);
 
   // Take a snapshot of the current MVCC state, which indicates which
@@ -228,31 +232,14 @@ class MvccManager {
   // all transactions which have a lower timestamp)
   //
   // If there are any in-flight transactions at a lower timestamp, waits for
-  // them to complete before returning. Hence, we guarantee that, upon return,
-  // snapshot->is_clean().
-  //
-  // TODO(KUDU-689): this may currently block forever, stalling scanner threads
-  // and potentially blocking tablet shutdown.
+  // them to complete before returning.
   //
-  // REQUIRES: 'timestamp' must be in the past according to the configured
-  // clock.
-  Status WaitForCleanSnapshotAtTimestamp(Timestamp timestamp,
+  // If 'timestamp' was marked safe before the call to this method (e.g. by TimeManager)
+  // then the returned snapshot is repeatable.
+  Status WaitForSnapshotWithAllCommitted(Timestamp timestamp,
                                          MvccSnapshot* snapshot,
                                          const MonoTime& deadline) const WARN_UNUSED_RESULT;
 
-  // Take a snapshot at the current timestamp, and then wait for any
-  // currently running transactions at an earlier timestamp to finish.
-  //
-  // The returned snapshot acts as a "barrier":
-  // - all transactions which started prior to this call are included in
-  //   snapshot
-  // - no transactions which start after the call returns will be included
-  //   in snapshot
-  // - snapshot->is_clean() is guaranteed
-  //
-  // Note that transactions are not blocked during this call.
-  void WaitForCleanSnapshot(MvccSnapshot* snapshot) const;
-
   // Wait for all operations that are currently APPLYING to commit.
   //
   // NOTE: this does _not_ guarantee that no transactions are APPLYING upon
@@ -286,6 +273,7 @@ class MvccManager {
   FRIEND_TEST(MvccTest, TestTxnAbort);
   FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit);
   FRIEND_TEST(MvccTest, TestWaitForApplyingTransactionsToCommit);
+  FRIEND_TEST(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights);
 
   enum TxnState {
     RESERVED,
@@ -326,7 +314,7 @@ class MvccManager {
   // Commits the given transaction.
   // Sets *was_earliest to true if this was the earliest in-flight transaction.
   void CommitTransactionUnlocked(Timestamp timestamp,
-                                 bool* was_earliest);
+                                 bool* was_earliest_in_flight);
 
   // Remove the timestamp 'ts' from the in-flight map.
   // FATALs if the ts is not in the in-flight map.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 5fe288f..aa5d39f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -391,7 +391,7 @@ void Tablet::StartTransaction(WriteTransactionState* tx_state) {
   gscoped_ptr<ScopedTransaction> mvcc_tx;
   DCHECK(tx_state->has_timestamp());
   mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp()));
-  tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
+  tx_state->SetMvccTx(std::move(mvcc_tx));
 }
 
 Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 95a167e..c8cf788 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -498,9 +498,6 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
                                            tablet_id));
   }
 
-  // Before playing any segments we set the safe and clean times to 'kMin' so that
-  // the MvccManager will accept all transactions that we replay as uncommitted.
-  tablet_->mvcc_manager()->AdjustSafeTime(Timestamp::kMin);
   RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");
 
   // Flush the consensus metadata once at the end to persist our changes, if any.


[6/6] kudu git commit: KUDU-1796: Use last REPLICATE OpId instead of last COMMIT OpId to tombstone a tablet replica

Posted by jd...@apache.org.
KUDU-1796: Use last REPLICATE OpId instead of last COMMIT OpId to tombstone a tablet replica

The kudu-tool-test has been flaky because of the following failure:

kudu-tool-test.cc:1203: Failure
Value of: tombstoned_opid.index()
  Actual: 205
  Expected: last_logged_opid.index()
  Which is: 206

Issue here is, offline cli tool "local_replica delete" stored
last COMMIT OpId on the tablet superblock instead of last REPLICATE
OpId while tombstoning the tablet replica. Hence kudu-tool-test
ToolTest.TestLocalReplicaTombstoneDelete observed that sometimes
tombstoned_opid (COMMIT) lagged behind last_logged_opid (REPLICATE).

This patch fixes the tool to use the REPLICATE OpId from the WAL.
Testing: Ran 5000 iterations of current failing test.

Change-Id: I02e95f05fab80e94b1afd078b23e5e03eca6e42a
Reviewed-on: http://gerrit.cloudera.org:8080/5416
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/839bd6f9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/839bd6f9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/839bd6f9

Branch: refs/heads/master
Commit: 839bd6f9abe3dedf439628e80fd7111763c11b52
Parents: 4d8fe6c
Author: Dinesh Bhat <di...@cloudera.com>
Authored: Thu Dec 8 13:03:41 2016 +0530
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Dec 8 15:49:41 2016 +0000

----------------------------------------------------------------------
 src/kudu/tools/tool_action_local_replica.cc | 30 ++++++++++++------------
 1 file changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/839bd6f9/src/kudu/tools/tool_action_local_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index b788748..7e0cd22 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -162,18 +162,18 @@ Status ParseHostPortString(const string& hostport_str, HostPort* hostport) {
   return Status::OK();
 }
 
-// Find the last committed OpId for the tablet_id from the WAL.
-Status FindLastCommittedOpId(FsManager* fs, const string& tablet_id,
-                             OpId* last_committed_opid) {
+// Find the last replicated OpId for the tablet_id from the WAL.
+Status FindLastLoggedOpId(FsManager* fs, const string& tablet_id,
+                          OpId* last_logged_opid) {
   shared_ptr<LogReader> reader;
   RETURN_NOT_OK(LogReader::Open(fs, scoped_refptr<log::LogIndex>(), tablet_id,
                                 scoped_refptr<MetricEntity>(), &reader));
   SegmentSequence segs;
   RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
-  // Reverse iterate segments to find the first 'last committed' entry.
+  // Reverse iterate the segments to find the 'last replicated' entry quickly.
   // Note that we still read the entries within a segment in sequential
-  // fashion, so the last entry within the 'found' segment will
-  // give us the last_committed_opid.
+  // fashion, so the last entry within the first 'found' segment will
+  // give us the last_logged_opid.
   vector<scoped_refptr<ReadableLogSegment>>::reverse_iterator seg;
   bool found = false;
   for (seg = segs.rbegin(); seg != segs.rend(); ++seg) {
@@ -183,13 +183,13 @@ Status FindLastCommittedOpId(FsManager* fs, const string& tablet_id,
       Status s = reader.ReadNextEntry(&entry);
       if (s.IsEndOfFile()) break;
       RETURN_NOT_OK_PREPEND(s, "Error in log segment");
-      if (entry.type() != log::COMMIT) continue;
-      *last_committed_opid = entry.commit().commited_op_id();
+      if (entry.type() != log::REPLICATE) continue;
+      *last_logged_opid = entry.replicate().id();
       found = true;
     }
     if (found) return Status::OK();
   }
-  return Status::NotFound("Committed OpId not found in the log");
+  return Status::NotFound("No entries found in the write-ahead log");
 }
 
 // Parses a colon-delimited string containing a uuid, hostname or IP address,
@@ -300,7 +300,7 @@ Status DeleteLocalReplica(const RunnerContext& context) {
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
-  boost::optional<OpId> last_committed_opid = boost::none;
+  boost::optional<OpId> last_logged_opid = boost::none;
   TabletDataState state = TabletDataState::TABLET_DATA_DELETED;
   if (!FLAGS_clean_unsafe) {
     state = TabletDataState::TABLET_DATA_TOMBSTONED;
@@ -308,14 +308,14 @@ Status DeleteLocalReplica(const RunnerContext& context) {
     // the log, it's not an error. But if we receive any other error,
     // indicate the user to delete with --clean_unsafe flag.
     OpId opid;
-    Status s = FindLastCommittedOpId(&fs_manager, tablet_id, &opid);
+    Status s = FindLastLoggedOpId(&fs_manager, tablet_id, &opid);
     if (s.ok()) {
-      last_committed_opid = opid;
+      last_logged_opid = opid;
     } else if (s.IsNotFound()) {
-      LOG(INFO) << "Could not find last committed OpId from WAL, "
+      LOG(INFO) << "Could not find any replicated OpId from WAL, "
                 << "but proceeding with tablet tombstone: " << s.ToString();
     } else {
-      LOG(ERROR) << "Error attempting to find last committed OpId in WAL: " << s.ToString();
+      LOG(ERROR) << "Error attempting to find last replicated OpId from WAL: " << s.ToString();
       LOG(ERROR) << "Cannot delete (tombstone) the tablet, use --clean_unsafe to delete"
                  << " the tablet permanently from this server.";
       return s;
@@ -325,7 +325,7 @@ Status DeleteLocalReplica(const RunnerContext& context) {
   // Force the specified tablet on this node to be in 'state'.
   scoped_refptr<TabletMetadata> meta;
   RETURN_NOT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &meta));
-  RETURN_NOT_OK(TSTabletManager::DeleteTabletData(meta, state, last_committed_opid));
+  RETURN_NOT_OK(TSTabletManager::DeleteTabletData(meta, state, last_logged_opid));
   return Status::OK();
 }
 


[3/6] kudu git commit: [docs] Add missing krb deps to the SLES12 and Ubuntu instructions

Posted by jd...@apache.org.
[docs] Add missing krb deps to the SLES12 and Ubuntu instructions

Change-Id: I9f98443a923edb6687427a5863e969883266bd30
Reviewed-on: http://gerrit.cloudera.org:8080/5130
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cbe80ea8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cbe80ea8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cbe80ea8

Branch: refs/heads/master
Commit: cbe80ea81b319616df4054094afe990a6f32c5ba
Parents: db4d0a6
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Nov 17 11:36:57 2016 -0800
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Dec 8 04:31:05 2016 +0000

----------------------------------------------------------------------
 docs/installation.adoc | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cbe80ea8/docs/installation.adoc
----------------------------------------------------------------------
diff --git a/docs/installation.adoc b/docs/installation.adoc
index 433983b..c8de165 100644
--- a/docs/installation.adoc
+++ b/docs/installation.adoc
@@ -362,7 +362,7 @@ make -j4
 +
 ----
 $ sudo apt-get install autoconf automake curl g++ gcc gdb git \
-  krb5-admin-server krb5-kdc krb5-user libsasl2-dev libsasl2-modules \
+  krb5-admin-server krb5-kdc krb5-user libkrb5-dev libsasl2-dev libsasl2-modules \
   libsasl2-modules-gssapi-mit libssl-dev libtool lsb-release make ntp openssl \
   patch pkg-config python rsync unzip vim-common
 ----
@@ -427,7 +427,7 @@ the steps marked *Optional* above.
 #!/bin/bash
 
 sudo apt-get -y install autoconf automake curl g++ gcc gdb git \
-  krb5-admin-server krb5-kdc krb5-user libsasl2-dev libsasl2-modules \
+  krb5-admin-server krb5-kdc krb5-user libkrb5-dev libsasl2-dev libsasl2-modules \
   libsasl2-modules-gssapi-mit libssl-dev libtool lsb-release make ntp openssl \
   patch pkg-config python rsync unzip vim-common
 git clone https://github.com/apache/kudu
@@ -449,7 +449,7 @@ make -j4
 +
 ----
 $ sudo zypper install autoconf automake curl cyrus-sasl-devel \
-  cyrus-sasl-gssapi gcc gcc-c++ gdb git libtool lsb-release make ntp \
+  cyrus-sasl-gssapi gcc gcc-c++ gdb git krb5-devel libtool lsb-release make ntp \
   openssl-devel patch pkg-config python rsync unzip vim
 ----
 
@@ -502,7 +502,7 @@ the steps marked *Optional* above.
 #!/bin/bash
 
 sudo zypper install -y autoconf automake curl cyrus-sasl-devel \
-  cyrus-sasl-gssapi gcc gcc-c++ gdb git libtool lsb-release make ntp \
+  cyrus-sasl-gssapi gcc gcc-c++ gdb git krb5-devel libtool lsb-release make ntp \
   openssl-devel patch pkg-config python rsync unzip vim
 git clone https://github.com/apache/kudu
 cd kudu


[4/6] kudu git commit: KUDU-798 (part 5) Correct safe time advancement

Posted by jd...@apache.org.
http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index 0eaaefe..5e901a0 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -92,6 +92,7 @@ using consensus::OpId;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
 using consensus::RaftConsensus;
+using consensus::TimeManager;
 using consensus::ALTER_SCHEMA_OP;
 using consensus::WRITE_OP;
 using log::Log;
@@ -162,11 +163,14 @@ Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
     RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
                                           meta_->fs_manager()->uuid(), &cmeta));
 
+    scoped_refptr<TimeManager> time_manager(new TimeManager(
+        clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
+
     consensus_ = RaftConsensus::Create(options,
                                        std::move(cmeta),
                                        local_peer_pb_,
                                        metric_entity,
-                                       clock_,
+                                       time_manager,
                                        this,
                                        messenger_,
                                        log_.get(),
@@ -532,9 +536,6 @@ Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>&
   // TODO(todd) Look at wiring the stuff below on the driver
   TransactionState* state = transaction->state();
   state->set_consensus_round(round);
-  Timestamp ts(replicate_msg->timestamp());
-  state->set_timestamp(ts);
-  clock_->Update(ts);
 
   scoped_refptr<TransactionDriver> driver;
   RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
@@ -555,8 +556,7 @@ Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transacti
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_,
-    clock_);
+    &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
   driver->swap(tx_driver);
 
@@ -571,8 +571,7 @@ Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transact
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_,
-    clock_);
+    &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
   driver->swap(tx_driver);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
index 762925b..d622626 100644
--- a/src/kudu/tablet/tablet_peer.h
+++ b/src/kudu/tablet/tablet_peer.h
@@ -26,6 +26,7 @@
 
 #include "kudu/consensus/consensus.h"
 #include "kudu/consensus/log.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/tablet.h"
@@ -148,6 +149,10 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
     return tablet_.get();
   }
 
+  scoped_refptr<consensus::TimeManager> time_manager() const {
+    return consensus_->time_manager();
+  }
+
   std::shared_ptr<Tablet> shared_tablet() const {
     std::lock_guard<simple_spinlock> lock(lock_);
     return tablet_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/alter_schema_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index 4ee5f0f..bb262fd 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -62,7 +62,6 @@ void AlterSchemaTransactionState::ReleaseSchemaLock() {
   TRACE("Released schema lock");
 }
 
-
 AlterSchemaTransaction::AlterSchemaTransaction(unique_ptr<AlterSchemaTransactionState> state,
                                                DriverType type)
     : Transaction(state.get(), type, Transaction::ALTER_SCHEMA_TXN),
@@ -96,9 +95,9 @@ Status AlterSchemaTransaction::Prepare() {
 }
 
 Status AlterSchemaTransaction::Start() {
-  if (!state_->has_timestamp()) {
-    state_->set_timestamp(state_->tablet_peer()->clock()->Now());
-  }
+  DCHECK(!state_->has_timestamp());
+  DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp());
+  state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp()));
   TRACE("START. Timestamp: $0", server::HybridClock::GetPhysicalValueMicros(state_->timestamp()));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index ab0606d..8c5b4b9 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -20,6 +20,7 @@
 #include <mutex>
 
 #include "kudu/consensus/consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/tablet_peer.h"
@@ -76,7 +77,6 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba
   scoped_refptr<ResultTracker> result_tracker_;
 };
 
-
 ////////////////////////////////////////////////////////////
 // TransactionDriver
 ////////////////////////////////////////////////////////////
@@ -86,15 +86,13 @@ TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
                                      Log* log,
                                      ThreadPool* prepare_pool,
                                      ThreadPool* apply_pool,
-                                     TransactionOrderVerifier* order_verifier,
-                                     scoped_refptr<server::Clock> clock)
+                                     TransactionOrderVerifier* order_verifier)
     : txn_tracker_(txn_tracker),
       consensus_(consensus),
       log_(log),
       prepare_pool_(prepare_pool),
       apply_pool_(apply_pool),
       order_verifier_(order_verifier),
-      clock_(std::move(clock)),
       trace_(new Trace()),
       start_time_(MonoTime::Now()),
       replication_state_(NOT_REPLICATING),
@@ -285,21 +283,9 @@ Status TransactionDriver::Prepare() {
   switch (repl_state_copy) {
     case NOT_REPLICATING:
     {
-      // Assign the timestamp just before submitting the transaction to consensus, if
-      // it doesn't have one.
-      // This is a placeholder since in the near future the timestamp will be assigned.
-      // within consensus.
-      // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798)
-      DCHECK(!transaction_->state()->has_timestamp());
-      if (transaction_->state()->external_consistency_mode() == COMMIT_WAIT) {
-        transaction_->state()->set_timestamp(clock_->NowLatest());
-      } else {
-        transaction_->state()->set_timestamp(clock_->Now());
-      }
-
-      transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
-          transaction_->state()->timestamp().ToUint64());
-
+      // Assign a timestamp to the transaction before we Start() it.
+      RETURN_NOT_OK(consensus_->time_manager()->AssignTimestamp(
+                        mutable_state()->consensus_round()->replicate_msg()));
       RETURN_NOT_OK(transaction_->Start());
       VLOG_WITH_PREFIX(4) << "Triggering consensus replication.";
       // Trigger consensus replication.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index df62a20..044edda 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -224,8 +224,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
                     log::Log* log,
                     ThreadPool* prepare_pool,
                     ThreadPool* apply_pool,
-                    TransactionOrderVerifier* order_verifier,
-                    scoped_refptr<server::Clock> clock);
+                    TransactionOrderVerifier* order_verifier);
 
   // Perform any non-constructor initialization. Sets the transaction
   // that will be executed.
@@ -355,11 +354,6 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   // Lock that synchronizes access to the transaction's state.
   mutable simple_spinlock lock_;
 
-  // Temporarily have the clock on the driver so that we can assign timestamps to
-  // transactions.
-  // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798).
-  scoped_refptr<server::Clock> clock_;
-
   // A copy of the transaction's OpId, set when the transaction first
   // receives one from Consensus and uninitialized until then.
   // TODO(todd): we have three separate copies of this now -- in TransactionState,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker-test.cc b/src/kudu/tablet/transactions/transaction_tracker-test.cc
index 54ef6fe..695dd6a 100644
--- a/src/kudu/tablet/transactions/transaction_tracker-test.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker-test.cc
@@ -94,8 +94,7 @@ class TransactionTrackerTest : public KuduTest {
                                 nullptr,
                                 nullptr,
                                 nullptr,
-                                nullptr,
-                                scoped_refptr<server::Clock>()));
+                                nullptr));
       gscoped_ptr<NoOpTransaction> tx(new NoOpTransaction(new NoOpTransactionState));
       RETURN_NOT_OK(driver->Init(tx.PassAs<Transaction>(), consensus::LEADER));
       local_drivers.push_back(driver);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/transaction_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.cc b/src/kudu/tablet/transactions/transaction_tracker.cc
index 95f9f1e..3661747 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker.cc
@@ -21,7 +21,6 @@
 #include <limits>
 #include <vector>
 
-
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet_peer.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index d7dce19..d9c7347 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -20,8 +20,8 @@
 #include <algorithm>
 #include <vector>
 
-#include "kudu/common/wire_protocol.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/walltime.h"
@@ -108,6 +108,9 @@ void WriteTransaction::AbortPrepare() {
 Status WriteTransaction::Start() {
   TRACE_EVENT0("txn", "WriteTransaction::Start");
   TRACE("Start()");
+  DCHECK(!state_->has_timestamp());
+  DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp());
+  state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp()));
   state_->tablet_peer()->tablet()->StartTransaction(state_.get());
   TRACE("Timestamp: $0", state_->tablet_peer()->clock()->Stringify(state_->timestamp()));
   return Status::OK();
@@ -224,13 +227,8 @@ WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
   }
 }
 
-void WriteTransactionState::SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx) {
+void WriteTransactionState::SetMvccTx(gscoped_ptr<ScopedTransaction> mvcc_tx) {
   DCHECK(!mvcc_tx_) << "Mvcc transaction already started/set.";
-  if (has_timestamp()) {
-    DCHECK_EQ(timestamp(), mvcc_tx->timestamp());
-  } else {
-    set_timestamp(mvcc_tx->timestamp());
-  }
   mvcc_tx_ = std::move(mvcc_tx);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 5cb8f23..2d35d27 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -100,11 +100,10 @@ class WriteTransactionState : public TransactionState {
   }
 
   // Set the MVCC transaction associated with this Write operation.
-  // This must be called exactly once, during the PREPARE phase just
-  // after the MvccManager has assigned a timestamp.
+  // This must be called exactly once, after the timestamp was acquired.
   // This also copies the timestamp from the MVCC transaction into the
   // WriteTransactionState object.
-  void SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx);
+  void SetMvccTx(gscoped_ptr<ScopedTransaction> mvcc_tx);
 
   // Set the Tablet components that this transaction will write into.
   // Called exactly once at the beginning of Apply, before applying its

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tools/tool_action_remote_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index ec8c6ea..57fa19d 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -95,14 +95,17 @@ class ReplicaDumper {
     ScanRequestPB req;
     ScanResponsePB resp;
 
+    // Scan and dump the tablet.
+    // Note that we do a READ_LATEST scan as we might be scanning a tablet who lost majority
+    // and thus cannot do snapshot scans.
+    // TODO(dalves) When KUDU-1704 is in change this to perform stale snapshot reads, which
+    // can be ordered.
     NewScanRequestPB* new_req = req.mutable_new_scan_request();
     RETURN_NOT_OK(SchemaToColumnPBs(
         schema, new_req->mutable_projected_columns(),
         SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES));
     new_req->set_tablet_id(tablet_id);
     new_req->set_cache_blocks(false);
-    new_req->set_order_mode(ORDERED);
-    new_req->set_read_mode(READ_AT_SNAPSHOT);
 
     do {
       RpcController rpc;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 84312f3..684209e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -28,6 +28,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/stl_util.h"
@@ -79,6 +80,12 @@ DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false,
             "If set, the server will support snapshot scans with logical timestamps.");
 TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe);
 
+DEFINE_int32(scanner_max_wait_ms, 1000,
+             "The maximum amount of time (in milliseconds) we'll hang a scanner thread waiting for "
+             "safe time to advance or transactions to commit, even if its deadline allows waiting "
+             "longer.");
+TAG_FLAG(scanner_max_wait_ms, advanced);
+
 // Fault injection flags.
 DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
              "If set, the scanner will pause the specified number of milliesconds "
@@ -1374,6 +1381,26 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
   return Status::OK();
 }
 
+namespace {
+// Checks if 'timestamp' is before the 'tablet's AHM if this is a READ_AT_SNAPSHOT scan.
+// Returns Status::OK() if it's not or Status::InvalidArgument() if it is.
+Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp) {
+  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
+  if (read_mode == READ_AT_SNAPSHOT && history_gc_opts.IsAncientHistory(timestamp)) {
+    return Status::InvalidArgument(
+        Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider "
+                       "increasing the value of the configuration parameter "
+                       "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
+                       "Ancient History Mark: $1 Physical time difference: $2",
+                   tablet->clock()->Stringify(timestamp),
+                   tablet->clock()->Stringify(history_gc_opts.ancient_history_mark()),
+                   tablet->clock()->GetPhysicalComponentDifference(
+                       timestamp, history_gc_opts.ancient_history_mark()).ToString()));
+  }
+  return Status::OK();
+}
+} // anonymous namespace
+
 // Start a new scan.
 Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
                                                const ScanRequestPB* req,
@@ -1483,13 +1510,21 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
         break;
       }
       case READ_AT_SNAPSHOT: {
-        s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet, &iter, snap_timestamp);
+        s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet_peer,
+                                 &iter, snap_timestamp);
+        // If we got a Status::ServiceUnavailable() from HandleScanAtSnapshot() it might
+        // mean we're just behind so let the client try again.
+        if (s.IsServiceUnavailable()) {
+          *error_code = TabletServerErrorPB::THROTTLED;
+          return s;
+        }
+
         if (!s.ok()) {
           tmp_error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
         }
         break;
       }
-      TRACE("Iterator created");
+        TRACE("Iterator created");
     }
   }
 
@@ -1535,23 +1570,15 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
   // end up with a valid snapshot in that case. It would be more correct to
   // initialize the row iterator and then select the latest timestamp
   // represented by those open files in that case.
-  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
-  if (scan_pb.read_mode() == READ_AT_SNAPSHOT &&
-      history_gc_opts.IsAncientHistory(*snap_timestamp)) {
-    // Now that we have initialized our row iterator at a snapshot, return an
-    // error if the snapshot timestamp was prior to the ancient history mark.
-    // We have to check after we open the iterator in order to avoid a TOCTOU
-    // error.
+  //
+  // Now that we have initialized our row iterator at a snapshot, return an
+  // error if the snapshot timestamp was prior to the ancient history mark.
+  // We have to check after we open the iterator in order to avoid a TOCTOU
+  // error.
+  s = VerifyNotAncientHistory(tablet.get(), scan_pb.read_mode(), *snap_timestamp);
+  if (!s.ok()) {
     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
-    return Status::InvalidArgument(
-        Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider "
-                   "increasing the value of the configuration parameter "
-                   "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
-                   "Ancient History Mark: $1 Physical time difference: $2",
-                   server_->clock()->Stringify(*snap_timestamp),
-                   server_->clock()->Stringify(history_gc_opts.ancient_history_mark()),
-                   server_->clock()->GetPhysicalComponentDifference(
-                       *snap_timestamp, history_gc_opts.ancient_history_mark()).ToString()));
+    return s;
   }
 
   *has_more_results = iter->HasNext();
@@ -1726,16 +1753,25 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
   return Status::OK();
 }
 
+namespace {
+// Helper to clamp a client deadline for a scan to the max supported by the server.
+MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) {
+  MonoTime now = MonoTime::Now();
+  if (deadline.GetDeltaSince(now).ToMilliseconds() > FLAGS_scanner_max_wait_ms) {
+    *was_clamped = true;
+    return now + MonoDelta::FromMilliseconds(FLAGS_scanner_max_wait_ms);
+  }
+  *was_clamped = false;
+  return deadline;
+}
+} // anonymous namespace
+
 Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                                                const RpcContext* rpc_context,
                                                const Schema& projection,
-                                               const shared_ptr<Tablet>& tablet,
+                                               TabletPeer* tablet_peer,
                                                gscoped_ptr<RowwiseIterator>* iter,
                                                Timestamp* snap_timestamp) {
-
-  // TODO check against the earliest boundary (i.e. how early can we go) right
-  // now we're keeping all undos/redos forever!
-
   // If the client sent a timestamp update our clock with it.
   if (scan_pb.has_propagated_timestamp()) {
     Timestamp propagated_timestamp(scan_pb.propagated_timestamp());
@@ -1776,32 +1812,51 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
     }
   }
 
-  tablet::MvccSnapshot snap;
+  // Before we wait on anything check that the timestamp is after the AHM.
+  // This is not the final check. We'll check this again after the iterators are open but
+  // there is no point in waiting if we can't actually scan afterwards.
+  RETURN_NOT_OK(VerifyNotAncientHistory(tablet_peer->tablet(),
+                                        ReadMode::READ_AT_SNAPSHOT,
+                                        tmp_snap_timestamp));
 
-  // Wait for the in-flights in the snapshot to be finished.
-  // We'll use the client-provided deadline, but not if it's more than 5 seconds from
-  // now -- it's better to make the client retry than hold RPC threads busy.
-  //
-  // TODO(KUDU-1127): even this may not be sufficient -- perhaps we should check how long it
-  // has been since the MVCC manager was able to advance its safe time. If it has been
-  // a long time, it's likely that the majority of voters for this tablet are down
-  // and some writes are "stuck" and therefore won't be committed.
-  // Subtract a little bit from the client deadline so that it's more likely we actually
-  // have time to send our response sent back before it times out.
-  MonoTime client_deadline =
-      rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);
+  tablet::MvccSnapshot snap;
+  Tablet* tablet = tablet_peer->tablet();
+  scoped_refptr<consensus::TimeManager> time_manager = tablet_peer->time_manager();
+  tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
+
+  // Reduce the client's deadline by a few msecs to allow for overhead.
+  MonoTime client_deadline = rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);
+
+  // Its not good for the tablet server or for the client if we hang here forever. The tablet
+  // server will have one less available thread and the client might be stuck spending all
+  // of the allotted time for the scan on a partitioned server that will never have a consistent
+  // snapshot at 'snap_timestamp'.
+  // Because of this we clamp the client's deadline to the max. configured. If the client
+  // sets a long timeout then it can use it by trying in other servers.
+  bool was_clamped = false;
+  MonoTime final_deadline = ClampScanDeadlineForWait(client_deadline, &was_clamped);
+
+  // Wait for the tablet to know that 'snap_timestamp' is safe. I.e. that all operations
+  // that came before it are, at least, started. This, together with waiting for the mvcc
+  // snapshot to be clean below, allows us to always return the same data when scanning at
+  // the same timestamp (repeatable reads).
+  TRACE("Waiting safe time to advance");
+  MonoTime before = MonoTime::Now();
+  Status s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline);
 
-  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
-  if (client_deadline < deadline) {
-    deadline = client_deadline;
+  if (s.ok()) {
+    // Wait for the in-flights in the snapshot to be finished.
+    TRACE("Waiting for operations to commit");
+    s = mvcc_manager->WaitForSnapshotWithAllCommitted(tmp_snap_timestamp, &snap, client_deadline);
   }
 
-  TRACE("Waiting for operations in snapshot to commit");
-  MonoTime before = MonoTime::Now();
-  RETURN_NOT_OK_PREPEND(
-      tablet->mvcc_manager()->WaitForCleanSnapshotAtTimestamp(
-          tmp_snap_timestamp, &snap, deadline),
-      "could not wait for desired snapshot timestamp to be consistent");
+  // If we got an TimeOut but we had clamped the deadline, return a ServiceUnavailable instead
+  // so that the client retries.
+  if (s.IsTimedOut() && was_clamped) {
+    return Status::ServiceUnavailable(s.CloneAndPrepend(
+        "could not wait for desired snapshot timestamp to be consistent").ToString());
+  }
+  RETURN_NOT_OK(s);
 
   uint64_t duration_usec = (MonoTime::Now() - before).ToMicroseconds();
   tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec);

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d8fe6cf/src/kudu/tserver/tablet_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index 430fb13..3cdc95e 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -93,7 +93,7 @@ class TabletServiceImpl : public TabletServerServiceIf {
   Status HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                               const rpc::RpcContext* rpc_context,
                               const Schema& projection,
-                              const std::shared_ptr<tablet::Tablet>& tablet,
+                              tablet::TabletPeer* tablet_peer,
                               gscoped_ptr<RowwiseIterator>* iter,
                               Timestamp* snap_timestamp);