You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/10/12 05:38:14 UTC

[2/2] kudu git commit: KUDU-2463 pt 2: bump MVCC safe time on Raft no-op

KUDU-2463 pt 2: bump MVCC safe time on Raft no-op

Based on the same rationale as Part 1 of this patch series, this patch
updates MVCC's safe and clean time using the no-op timestamp provided by
the leader following a successful Raft election.

There isn't an obvious reference to the tablet (to get to the MVCC
module) in Raft consensus, but there is a ReplicaTransactionFactory,
that the TabletReplica implements. I've extended this to be a more
general ConsensusRoundHandler that can be used to create transactions or
finish transactions as needed.

An invariant we are trying to uphold is that once MVCC's safe time is
adjusted, all further transactions registered with MVCC will have higher
timestamps than the safe time. With this in mind, it is critical that
the adjustment of safe time be serialized with respect to transactions.
This is the case today because safe time is only advanced by writes in
the prepare thread, on which transactions are started. To echo this,
Raft no-ops will also adjust the safe time on the prepare thread.

The following test changes are included:
- to ensure nothing terrible happens when there is a lot of election
  churn (and hence, a lot of new timestamp advancement), I've tweaked
  exactly_once_writes-itest to more explicitly churn elections.
  Previously it attempted this with just a low timeout. I injected some
  latency to make it churn a bit harder and looped the test 1000 times
  in both TSAN and debug mode.
- since MvccManager::StartTransaction() will hit a CHECK failure if it
  starts a transaction at a timestamp that was previously marked safe, I
  added a configurable sleep at the beginning of the function to widen
  the window during which safe time can be advanced, encouraging the
  CHECK failure. I configured this in raft_consensus_election-itest and
  looped it 1000 times in TSAN and debug mode. If no-ops _didn't_ use
  the prepare thread to advance safe time, the added delay would lead to
  CHECK failures.
- added a test that ensures that, on its own, a tablet will bump its
  MVCC timestamps, with just its elections
- tweaked raft_consensus-itest to use more realistic timestamps, now
  that MVCC's clean and safe time gets updated with the leadership no-op

This patch alone doesn't fix KUDU-2463. Rather, a later patch will
prevent scans from occuring if the MVCC safe time hasn't been advanced,
at which point this patch will reduce the window of scan unavailability.

Change-Id: Icbf812e2cbeeee7c322fd980245cfe40c886a15a
Reviewed-on: http://gerrit.cloudera.org:8080/11427
Tested-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bc817a44
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bc817a44
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bc817a44

Branch: refs/heads/master
Commit: bc817a44867c586bf4e0539aa564b282c666a49d
Parents: fdc215c
Author: Andrew Wong <aw...@cloudera.com>
Authored: Tue Sep 11 12:15:02 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Fri Oct 12 05:35:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  6 +-
 src/kudu/consensus/raft_consensus.cc            | 13 ++--
 src/kudu/consensus/raft_consensus.h             | 50 +++++++-----
 .../exactly_once_writes-itest.cc                | 16 +++-
 .../integration-tests/raft_consensus-itest.cc   | 82 ++++++++++++--------
 .../raft_consensus_election-itest.cc            |  2 +
 .../timestamp_advancement-itest.cc              | 27 +++++++
 src/kudu/tablet/mvcc.cc                         | 24 ++++--
 src/kudu/tablet/mvcc.h                          |  3 +-
 src/kudu/tablet/tablet_replica.cc               | 47 ++++++++++-
 src/kudu/tablet/tablet_replica.h                | 15 ++--
 11 files changed, 204 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 8b6a71e..d6dc3f0 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -668,7 +668,7 @@ class TestDriver {
 };
 
 // A transaction factory for tests, usually this is implemented by TabletReplica.
-class TestTransactionFactory : public ReplicaTransactionFactory {
+class TestTransactionFactory : public ConsensusRoundHandler {
  public:
   explicit TestTransactionFactory(log::Log* log)
       : consensus_(nullptr),
@@ -681,7 +681,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
     consensus_ = consensus;
   }
 
-  Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) OVERRIDE {
+  Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& round) override {
     auto txn = new TestDriver(pool_.get(), log_, round);
     txn->round_->SetConsensusReplicatedCallback(std::bind(
         &TestDriver::ReplicationFinished,
@@ -690,6 +690,8 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
     return Status::OK();
   }
 
+  void FinishConsensusOnlyRound(ConsensusRound* /*round*/) override {}
+
   void ReplicateAsync(ConsensusRound* round) {
     CHECK_OK(consensus_->Replicate(round));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cd58896..f4ba768 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -227,7 +227,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                             scoped_refptr<log::Log> log,
                             scoped_refptr<TimeManager> time_manager,
-                            ReplicaTransactionFactory* txn_factory,
+                            ConsensusRoundHandler* round_handler,
                             const scoped_refptr<MetricEntity>& metric_entity,
                             Callback<void(const std::string& reason)> mark_dirty_clbk) {
   DCHECK(metric_entity);
@@ -235,7 +235,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
   peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
   log_ = DCHECK_NOTNULL(std::move(log));
   time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
-  txn_factory_ = DCHECK_NOTNULL(txn_factory);
+  round_handler_ = DCHECK_NOTNULL(round_handler);
   mark_dirty_clbk_ = std::move(mark_dirty_clbk);
 
   term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, CurrentTerm());
@@ -328,7 +328,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                                    << SecureShortDebugString(cmeta_->ActiveConfig());
     for (ReplicateMsg* replicate : info.orphaned_replicates) {
       ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
-      RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
+      RETURN_NOT_OK(StartFollowerTransactionUnlocked(replicate_ptr));
     }
 
     // Set the initial committed opid for the PendingRounds only after
@@ -927,7 +927,7 @@ static bool IsConsensusOnlyOperation(OperationType op_type) {
   return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
 }
 
-Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg) {
+Status RaftConsensus::StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg) {
   DCHECK(lock_.is_locked());
 
   if (IsConsensusOnlyOperation(msg->get()->op_type())) {
@@ -943,7 +943,7 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
                                << SecureShortDebugString(msg->get()->id());
   scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
   ConsensusRound* round_ptr = round.get();
-  RETURN_NOT_OK(txn_factory_->StartReplicaTransaction(round));
+  RETURN_NOT_OK(round_handler_->StartFollowerTransaction(round));
   return AddPendingOperationUnlocked(round_ptr);
 }
 
@@ -1386,7 +1386,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     }
 
     while (iter != deduped_req.messages.end()) {
-      prepare_status = StartReplicaTransactionUnlocked(*iter);
+      prepare_status = StartFollowerTransactionUnlocked(*iter);
       if (PREDICT_FALSE(!prepare_status.ok())) {
         break;
       }
@@ -2636,6 +2636,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   }
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
                                << round->id();
+  round_handler_->FinishConsensusOnlyRound(round);
   gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
   commit_msg->set_op_type(round->replicate_msg()->op_type());
   *commit_msg->mutable_commited_op_id() = round->id();

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index f84a71e..f472103 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -72,10 +72,10 @@ namespace consensus {
 
 class ConsensusMetadataManager;
 class ConsensusRound;
+class ConsensusRoundHandler;
 class PeerManager;
 class PeerProxyFactory;
 class PendingRounds;
-class ReplicaTransactionFactory;
 struct ConsensusBootstrapInfo;
 struct ElectionResult;
 
@@ -148,7 +148,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                scoped_refptr<log::Log> log,
                scoped_refptr<TimeManager> time_manager,
-               ReplicaTransactionFactory* txn_factory,
+               ConsensusRoundHandler* round_handler,
                const scoped_refptr<MetricEntity>& metric_entity,
                Callback<void(const std::string& reason)> mark_dirty_clbk);
 
@@ -518,7 +518,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Begin a replica transaction. If the type of message in 'msg' is not a type
   // that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
-  Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
+  Status StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg);
 
   // Returns true if this node is the only voter in the Raft configuration.
   bool IsSingleVoterConfig() const;
@@ -803,9 +803,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   scoped_refptr<TimeManager> time_manager_;
   gscoped_ptr<PeerProxyFactory> peer_proxy_factory_;
 
-  // When we receive a message from a remote peer telling us to start a transaction, we use
-  // this factory to start it.
-  ReplicaTransactionFactory* txn_factory_;
+  // When we receive a message from a remote peer telling us to start a
+  // transaction, or finish a round, we use this handler to handle it.
+  // This may update replica state (e.g. the tablet replica).
+  ConsensusRoundHandler* round_handler_;
 
   std::unique_ptr<PeerManager> peer_manager_;
 
@@ -896,32 +897,39 @@ struct ConsensusBootstrapInfo {
   DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
 };
 
-// Factory for replica transactions.
-// An implementation of this factory must be registered prior to consensus
-// start, and is used to create transactions when the consensus implementation receives
-// messages from the leader.
+// Handler for consensus rounds.
+// An implementation of this handler must be registered prior to consensus
+// start, and is used to:
+// - Create transactions when the consensus implementation receives messages
+//   from the leader.
+// - Handle when the consensus implementation finishes a non-transaction round
 //
-// Replica transactions execute the following way:
+// Follower transactions execute the following way:
 //
 // - When a ReplicateMsg is first received from the leader, the RaftConsensus
-//   instance creates the ConsensusRound and calls StartReplicaTransaction().
-//   This will trigger the Prepare(). At the same time replica consensus
-//   instance immediately stores the ReplicateMsg in the Log. Once the replicate
+//   instance creates the ConsensusRound and calls StartFollowerTransaction().
+//   This will trigger the Prepare(). At the same time, the follower's consensus
+//   instance immediately stores the ReplicateMsg in the Log. Once the
 //   message is stored in stable storage an ACK is sent to the leader (i.e. the
 //   replica RaftConsensus instance does not wait for Prepare() to finish).
 //
-// - When the CommitMsg for a replicate is first received from the leader
-//   the replica waits for the corresponding Prepare() to finish (if it has
-//   not completed yet) and then proceeds to trigger the Apply().
+// - When the CommitMsg for a replicate is first received from the leader, the
+//   follower waits for the corresponding Prepare() to finish (if it has not
+//   completed yet) and then proceeds to trigger the Apply().
 //
-// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
+// - Once Apply() completes the ConsensusRoundHandler is responsible for logging
 //   a CommitMsg to the log to ensure that the operation can be properly restored
 //   on a restart.
-class ReplicaTransactionFactory {
+class ConsensusRoundHandler {
  public:
-  virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+  virtual ~ConsensusRoundHandler() {}
 
-  virtual ~ReplicaTransactionFactory() {}
+  virtual Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+
+  // Consensus-only rounds complete when non-transaction ops finish
+  // replication. This can be used to trigger callbacks, akin to an Apply() for
+  // transaction ops.
+  virtual void FinishConsensusOnlyRound(ConsensusRound* round) = 0;
 };
 
 // Context for a consensus round on the LEADER side, typically created as an

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index b291bc9..c1fa863 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -63,6 +63,9 @@ using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
+
+using strings::Substitute;
+
 namespace tserver {
 
 static const int kConsensusRpcTimeoutForTests = 50;
@@ -314,13 +317,22 @@ TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithCrashyNo
 TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithChurnyElections) {
   vector<string> ts_flags, master_flags;
 
+  int raft_heartbeat_interval;
 #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
   // On TSAN/ASAN builds, we need to be a little bit less churny in order to make
   // any progress at all.
-  ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+  raft_heartbeat_interval = 100;
 #else
-  ts_flags.emplace_back("--raft_heartbeat_interval_ms=2");
+  raft_heartbeat_interval = 50;
 #endif
+  // Inject random latency of up to the Raft heartbeat interval to ensure there
+  // will be missed heartbeats, triggering actual elections.
+  ts_flags = {
+    Substitute("--raft_heartbeat_interval_ms=$0", raft_heartbeat_interval),
+    Substitute("--consensus_inject_latency_ms_in_notifications=$0", raft_heartbeat_interval),
+    "--raft_enable_pre_election=false",
+    "--leader_failure_max_missed_heartbeat_periods=1",
+  };
 
   int num_batches = 200;
   if (AllowSlowTests()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index e18ef75..5ee5bde 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -93,10 +93,12 @@ DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 DECLARE_int32(rpc_timeout);
 
+METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
 METRIC_DECLARE_gauge_int64(time_since_last_leader_heartbeat);
 METRIC_DECLARE_gauge_int64(failed_elections_since_stable_leader);
+METRIC_DECLARE_gauge_uint64(hybrid_clock_timestamp);
 
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
@@ -161,6 +163,9 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   RaftConsensusITest() {
   }
 
+  // Gets the current timestamp on the given server.
+  int64_t GetTimestampOnServer(TServerDetails* tserver) const;
+
   // Scan the given replica in a loop until the number of rows
   // is 'expected_count'. If it takes more than 10 seconds, then
   // fails the test.
@@ -169,9 +174,10 @@ class RaftConsensusITest : public RaftConsensusITestBase {
                        vector<string>* results);
 
   // Add an Insert operation to the given consensus request.
-  // The row to be inserted is generated based on the OpId.
-  void AddOp(const OpId& id, ConsensusRequestPB* req);
-  void AddOpWithTypeAndKey(const OpId& id,
+  // The row to be inserted is generated based on the OpId with a timestamp
+  // that is greater than 'base_ts'.
+  void AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req);
+  void AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
                            RowOperationsPB::Type op_type,
                            int32_t key,
                            ConsensusRequestPB* req);
@@ -223,6 +229,14 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   vector<scoped_refptr<kudu::Thread> > threads_;
 };
 
+int64_t RaftConsensusITest::GetTimestampOnServer(TServerDetails* tserver) const {
+  int64_t ret;
+  ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(tserver->uuid());
+  CHECK_OK(GetInt64Metric(ets->bound_http_hostport(), &METRIC_ENTITY_server,
+                          nullptr, &METRIC_hybrid_clock_timestamp, "value", &ret));
+  return ret;
+}
+
 void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy,
                                          int expected_count,
                                          vector<string>* results) {
@@ -245,12 +259,12 @@ void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy
          << ": rows: " << *results;
 }
 
-void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
-  AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
+void RaftConsensusITest::AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req) {
+  AddOpWithTypeAndKey(id, base_ts, RowOperationsPB::INSERT,
                       id.index() * 10000 + id.term(), req);
 }
 
-void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
+void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
                                              RowOperationsPB::Type op_type,
                                              int32_t key,
                                              ConsensusRequestPB* req) {
@@ -260,7 +274,7 @@ void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
   // increasing per op and starts off higher than 1. This is required, as some
   // test cases test the scenario where the WAL is replayed and no-ops and
   // writes are expected to have monotonically increasing timestamps.
-  msg->set_timestamp(id.index() * 10000 + id.term());
+  msg->set_timestamp(base_ts + id.index() * 10000 + id.term());
   msg->set_op_type(consensus::WRITE_OP);
   WriteRequestPB* write_req = msg->mutable_write_request();
   CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
@@ -596,8 +610,6 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_
 
 void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
   const vector<string> kTsFlags = {
-    // Don't use the hybrid clock as we set logical timestamps on ops.
-    "--use_hybrid_clock=false",
     "--enable_leader_failure_detection=false",
   };
   const vector<string> kMasterFlags = {
@@ -1061,10 +1073,12 @@ TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
 
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+
   // Send operations 2.1 through 2.3, committing through 2.2.
-  AddOp(MakeOpId(2, 1), &req);
-  AddOp(MakeOpId(2, 2), &req);
-  AddOp(MakeOpId(2, 3), &req);
+  AddOp(MakeOpId(2, 1), base_ts, &req);
+  AddOp(MakeOpId(2, 2), base_ts, &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
   req.set_committed_index(2);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1091,7 +1105,7 @@ TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
   req.set_caller_term(3);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 3));
   req.clear_ops();
-  AddOp(MakeOpId(3, 4), &req);
+  AddOp(MakeOpId(3, 4), base_ts, &req);
   ASSERT_EVENTUALLY([&]() {
       rpc.Reset();
       ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1131,9 +1145,10 @@ TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
   req.set_caller_term(2);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
-  AddOpWithTypeAndKey(MakeOpId(2, 2), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(2, 3), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(2, 4), RowOperationsPB::UPSERT, 1, &req);
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+  AddOpWithTypeAndKey(MakeOpId(2, 2), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 3), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
   req.set_committed_index(2);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1143,8 +1158,8 @@ TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
   req.set_caller_term(3);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 3));
   req.clear_ops();
-  AddOpWithTypeAndKey(MakeOpId(3, 4), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(3, 5), RowOperationsPB::UPSERT, 2, &req);
+  AddOpWithTypeAndKey(MakeOpId(3, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(3, 5), base_ts, RowOperationsPB::UPSERT, 2, &req);
   rpc.Reset();
   rpc.set_timeout(MonoDelta::FromSeconds(5));
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1206,9 +1221,10 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
 
   // Send some operations, but don't advance the commit index.
   // They should not commit.
-  AddOp(MakeOpId(2, 2), &req);
-  AddOp(MakeOpId(2, 3), &req);
-  AddOp(MakeOpId(2, 4), &req);
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+  AddOp(MakeOpId(2, 2), base_ts, &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
+  AddOp(MakeOpId(2, 4), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
@@ -1224,7 +1240,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // request, and the replica should reject it.
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
   req.clear_ops();
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
@@ -1237,8 +1253,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // Send ops 3.5 and 2.6, then commit up to index 6, the replica
   // should fail because of the out-of-order terms.
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-  AddOp(MakeOpId(3, 5), &req);
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(3, 5), base_ts, &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
@@ -1256,7 +1272,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   resp.Clear();
   req.clear_ops();
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
-  AddOp(MakeOpId(2, 3), &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
   req.set_committed_index(4);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1274,8 +1290,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // Now send some more ops, and commit the earlier ones.
   req.set_committed_index(4);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-  AddOp(MakeOpId(2, 5), &req);
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(2, 5), base_ts, &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
@@ -1325,8 +1341,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     req.set_caller_uuid("new_leader");
     req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
     req.clear_ops();
-    AddOp(MakeOpId(leader_term, 5), &req);
-    AddOp(MakeOpId(leader_term, 6), &req);
+    AddOp(MakeOpId(leader_term, 5), base_ts, &req);
+    AddOp(MakeOpId(leader_term, 6), base_ts, &req);
     rpc.Reset();
     ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
     ASSERT_FALSE(resp.has_error()) << "Req: " << SecureShortDebugString(req)
@@ -1851,8 +1867,9 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   req.set_committed_index(1);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
   for (int i = 0; i < kNumOps; i++) {
-    AddOp(MakeOpId(1, 2 + i), &req);
+    AddOp(MakeOpId(1, 2 + i), base_ts, &req);
   }
   OpId last_opid = MakeOpId(1, 2 + kNumOps - 1);
   ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1875,7 +1892,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   req.mutable_preceding_id()->CopyFrom(last_opid);
   req.set_committed_index(last_opid.index());
   req.mutable_ops()->Clear();
-  AddOp(MakeOpId(1, last_opid.index() + 1), &req);
+  AddOp(MakeOpId(1, last_opid.index() + 1), base_ts, &req);
   rpc.Reset();
   Status s = replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc);
 
@@ -2580,8 +2597,9 @@ TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
   req.set_committed_index(0);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
   for (int i = 0; i < kNumOps; i++) {
-    AddOp(MakeOpId(0, 1 + i), &req);
+    AddOp(MakeOpId(0, 1 + i), base_ts, &req);
   }
 
   ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/integration-tests/raft_consensus_election-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
index 86b7763..63c2ecf 100644
--- a/src/kudu/integration-tests/raft_consensus_election-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -101,8 +101,10 @@ void RaftConsensusElectionITest::CreateClusterForChurnyElectionsTests(
   // On TSAN builds, we need to be a little bit less churny in order to make
   // any progress at all.
   ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+  ts_flags.emplace_back("--inject_latency_ms_before_starting_txn=100");
 #else
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
+  ts_flags.emplace_back("--inject_latency_ms_before_starting_txn=1000");
 #endif
 
   ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/integration-tests/timestamp_advancement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index 88bfff5..be30bdf 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -38,6 +38,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -232,5 +233,31 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
   ASSERT_NE(cleantime, Timestamp::kInitialTimestamp);
 }
 
+// Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in
+// both the "normal" case and the single-replica case.
+TEST_F(TimestampAdvancementITest, TestTimestampsAdvancedFromRaftNoOp) {
+  const int kTserver = 0;
+  for (int num_replicas : { 1, 3 }) {
+    LOG(INFO) << strings::Substitute("Running with $0 replicas", num_replicas);
+    NO_FATALS(StartCluster(num_replicas));
+
+    // Create an empty tablet with a single replica.
+    TestWorkload create_tablet(cluster_.get());
+    create_tablet.set_num_replicas(num_replicas);
+    create_tablet.Setup();
+    scoped_refptr<TabletReplica> replica = tablet_replica_on_ts(kTserver);
+
+    // Despite there not being any writes, the replica will eventually bump its
+    // MVCC clean time on its own when a leader gets elected and replicates a
+    // no-op message.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_NE(replica->tablet()->mvcc_manager()->GetCleanTimestamp(),
+                Timestamp::kInitialTimestamp);
+    });
+    replica.reset();
+    StopCluster();
+  }
+}
+
 }  // namespace itest
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index 62d3d43..21e97e0 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -22,6 +22,7 @@
 #include <ostream>
 #include <utility>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
@@ -30,8 +31,17 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 
+DEFINE_int32(inject_latency_ms_before_starting_txn, 0,
+             "Amount of latency in ms to inject before registering "
+             "a transaction with MVCC.");
+TAG_FLAG(inject_latency_ms_before_starting_txn, advanced);
+TAG_FLAG(inject_latency_ms_before_starting_txn, hidden);
+
 namespace kudu {
 namespace tablet {
 
@@ -46,6 +56,7 @@ MvccManager::MvccManager()
 }
 
 void MvccManager::StartTransaction(Timestamp timestamp) {
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn);
   std::lock_guard<LockType> l(lock_);
   CHECK(!cur_snap_.IsCommitted(timestamp)) << "Trying to start a new txn at an already-committed"
                                            << " timestamp: " << timestamp.ToString()
@@ -53,7 +64,7 @@ void MvccManager::StartTransaction(Timestamp timestamp) {
   CHECK(InitTransactionUnlocked(timestamp)) << "There is already a transaction with timestamp: "
                                             << timestamp.value() << " in flight or this timestamp "
                                             << "is before than or equal to \"safe\" time."
-                                            << "Current Snapshot: " << cur_snap_.ToString()
+                                            << " Current Snapshot: " << cur_snap_.ToString()
                                             << " Current safe time: " << safe_time_;
 }
 
@@ -176,12 +187,11 @@ void MvccManager::AdjustSafeTime(Timestamp safe_time) {
     DVLOG(4) << "Adjusting safe time to: " << safe_time;
     safe_time_ = safe_time;
   } else {
-    // TODO(dralves) This shouldn't happen, the safe time passed to MvccManager should be
-    // monotically increasing. If if does though, the impact is on scan snapshot correctness,
-    // not on corruption of state and some test-only code sets this back (LocalTabletWriter).
-    // Note that we will still crash if a transaction comes in with a timestamp that is lower
-    // than 'cur_snap_.all_committed_before_'.
-    LOG_EVERY_N(ERROR, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
+    // Note: Getting here means that we are about to apply a transaction out of
+    // order. This out-of-order applying is only safe because concurrrent
+    // transactions are guaranteed to not affect the same state based on locks
+    // taken before starting the transaction (e.g. row locks, schema locks).
+    KLOG_EVERY_N(INFO, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
                                          "Current Snapshot: $2", safe_time_.ToString(),
                                          safe_time.ToString(), cur_snap_.ToString());
     return;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 24af2b3..5ce1f95 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -219,7 +219,8 @@ class MvccManager {
   // StartApplyingTransaction(), or else this logs a FATAL error.
   void CommitTransaction(Timestamp timestamp);
 
-  // Adjusts the safe time so that the MvccManager can trim state.
+  // Adjusts the safe time so that the MvccManager can trim state, provided
+  // 'safe_time' is higher than the current safe time.
   //
   // This must only be called when there is a guarantee that there won't be
   // any more transactions with timestamps equal to or lower than 'safe_time'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 821cd29..751f3aa 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -30,6 +30,7 @@
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
@@ -39,6 +40,7 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
@@ -212,9 +214,9 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
     }
 
     // We cannot hold 'lock_' while we call RaftConsensus::Start() because it
-    // may invoke TabletReplica::StartReplicaTransaction() during startup, causing
-    // a self-deadlock. We take a ref to members protected by 'lock_' before
-    // unlocking.
+    // may invoke TabletReplica::StartFollowerTransaction() during startup,
+    // causing a self-deadlock. We take a ref to members protected by 'lock_'
+    // before unlocking.
     RETURN_NOT_OK(consensus_->Start(
         bootstrap_info,
         std::move(peer_proxy_factory),
@@ -579,7 +581,7 @@ Status TabletReplica::GetGCableDataSize(int64_t* retention_size) const {
   return Status::OK();
 }
 
-Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
+Status TabletReplica::StartFollowerTransaction(const scoped_refptr<ConsensusRound>& round) {
   {
     std::lock_guard<simple_spinlock> lock(lock_);
     if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
@@ -637,6 +639,43 @@ Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound
   return Status::OK();
 }
 
+void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
+  consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
+  consensus::OperationType op_type = replicate_msg->op_type();
+
+  // The timestamp of a Raft no-op used to assert term leadership is guaranteed
+  // to be lower than the timestamps of writes in the same terms and those
+  // thereafter. As such, we are able to bump the MVCC safe time with the
+  // timestamps of such no-ops, as further transaction timestamps are
+  // guaranteed to be higher than them.
+  //
+  // It is important for MVCC safe time updates to be serialized with respect
+  // to transactions. To ensure that we only advance the safe time with the
+  // no-op of term N after all transactions of term N-1 have been prepared, we
+  // run the adjustment function on the prepare thread, which is the same
+  // mechanism we use to serialize transactions.
+  //
+  // If the 'timestamp_in_opid_order' flag is unset, the no-op is assumed to be
+  // the Raft leadership no-op from a version of Kudu that only supported creating
+  // a no-op to assert a new leadership term, in which case it would be in order.
+  if (op_type == consensus::NO_OP &&
+      (!replicate_msg->noop_request().has_timestamp_in_opid_order() ||
+       replicate_msg->noop_request().timestamp_in_opid_order())) {
+    DCHECK(replicate_msg->has_noop_request());
+    int64_t ts = replicate_msg->timestamp();
+    // We are guaranteed that the prepare pool token is running now because
+    // TabletReplica::Stop() stops RaftConsensus before it stops the prepare
+    // pool token and this callback is invoked while the RaftConsensus lock is
+    // held.
+    CHECK_OK(prepare_pool_token_->SubmitFunc([this, ts] {
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (state_ == RUNNING || state_ == BOOTSTRAPPING) {
+        tablet_->mvcc_manager()->AdjustSafeTime(Timestamp(ts));
+      }
+    }));
+  }
+}
+
 Status TabletReplica::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
                                                  scoped_refptr<TransactionDriver>* driver) {
   scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(

http://git-wip-us.apache.org/repos/asf/kudu/blob/bc817a44/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 253e9c0..bf9d63d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -27,15 +27,14 @@
 
 #include <gtest/gtest_prod.h>
 
-#include "kudu/consensus/raft_consensus.h"
-#include "kudu/consensus/time_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
@@ -86,7 +85,7 @@ class WriteTransactionState;
 // peers see the same updates in the same order. In addition to this, this
 // class also splits the work and coordinates multi-threaded execution.
 class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
-                      public consensus::ReplicaTransactionFactory {
+                      public consensus::ConsensusRoundHandler {
  public:
   TabletReplica(scoped_refptr<TabletMetadata> meta,
                 scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
@@ -153,8 +152,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
 
   // Used by consensus to create and start a new ReplicaTransaction.
-  virtual Status StartReplicaTransaction(
-      const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
+  virtual Status StartFollowerTransaction(
+      const scoped_refptr<consensus::ConsensusRound>& round) override;
+
+  // Used by consensus to notify the tablet replica that a consensus-only round
+  // has finished, advancing MVCC safe time as appropriate.
+  virtual void FinishConsensusOnlyRound(consensus::ConsensusRound* round) override;
 
   consensus::RaftConsensus* consensus() {
     std::lock_guard<simple_spinlock> lock(lock_);