You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2018/10/16 08:44:59 UTC

[1/3] kudu git commit: KUDU-2463 pt 2: bump MVCC safe time on Raft no-op

Repository: kudu
Updated Branches:
  refs/heads/branch-1.8.x ac9e22e33 -> cbbf7b580


KUDU-2463 pt 2: bump MVCC safe time on Raft no-op

Based on the same rationale as Part 1 of this patch series, this patch
updates MVCC's safe and clean time using the no-op timestamp provided by
the leader following a successful Raft election.

There isn't an obvious reference to the tablet (to get to the MVCC
module) in Raft consensus, but there is a ReplicaTransactionFactory,
that the TabletReplica implements. I've extended this to be a more
general ConsensusRoundHandler that can be used to create transactions or
finish transactions as needed.

An invariant we are trying to uphold is that once MVCC's safe time is
adjusted, all further transactions registered with MVCC will have higher
timestamps than the safe time. With this in mind, it is critical that
the adjustment of safe time be serialized with respect to transactions.
This is the case today because safe time is only advanced by writes in
the prepare thread, on which transactions are started. To echo this,
Raft no-ops will also adjust the safe time on the prepare thread.

The following test changes are included:
- to ensure nothing terrible happens when there is a lot of election
  churn (and hence, a lot of new timestamp advancement), I've tweaked
  exactly_once_writes-itest to more explicitly churn elections.
  Previously it attempted this with just a low timeout. I injected some
  latency to make it churn a bit harder and looped the test 1000 times
  in both TSAN and debug mode.
- since MvccManager::StartTransaction() will hit a CHECK failure if it
  starts a transaction at a timestamp that was previously marked safe, I
  added a configurable sleep at the beginning of the function to widen
  the window during which safe time can be advanced, encouraging the
  CHECK failure. I configured this in raft_consensus_election-itest and
  looped it 1000 times in TSAN and debug mode. If no-ops _didn't_ use
  the prepare thread to advance safe time, the added delay would lead to
  CHECK failures.
- added a test that ensures that, on its own, a tablet will bump its
  MVCC timestamps, with just its elections
- tweaked raft_consensus-itest to use more realistic timestamps, now
  that MVCC's clean and safe time gets updated with the leadership no-op

This patch alone doesn't fix KUDU-2463. Rather, a later patch will
prevent scans from occuring if the MVCC safe time hasn't been advanced,
at which point this patch will reduce the window of scan unavailability.

Change-Id: Icbf812e2cbeeee7c322fd980245cfe40c886a15a
Reviewed-on: http://gerrit.cloudera.org:8080/11427
Tested-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/11689
Reviewed-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b3bb51ec
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b3bb51ec
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b3bb51ec

Branch: refs/heads/branch-1.8.x
Commit: b3bb51ec89c7f1856aa25c1fb015ce305b28aba3
Parents: ac9e22e
Author: Andrew Wong <aw...@cloudera.com>
Authored: Tue Sep 11 12:15:02 2018 -0700
Committer: Attila Bukor <ab...@apache.org>
Committed: Tue Oct 16 08:40:16 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  6 +-
 src/kudu/consensus/raft_consensus.cc            | 13 ++--
 src/kudu/consensus/raft_consensus.h             | 50 +++++++-----
 .../exactly_once_writes-itest.cc                | 16 +++-
 .../integration-tests/raft_consensus-itest.cc   | 82 ++++++++++++--------
 .../raft_consensus_election-itest.cc            |  2 +
 .../timestamp_advancement-itest.cc              | 27 +++++++
 src/kudu/tablet/mvcc.cc                         | 24 ++++--
 src/kudu/tablet/mvcc.h                          |  3 +-
 src/kudu/tablet/tablet_replica.cc               | 47 ++++++++++-
 src/kudu/tablet/tablet_replica.h                | 15 ++--
 11 files changed, 204 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 8b6a71e..d6dc3f0 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -668,7 +668,7 @@ class TestDriver {
 };
 
 // A transaction factory for tests, usually this is implemented by TabletReplica.
-class TestTransactionFactory : public ReplicaTransactionFactory {
+class TestTransactionFactory : public ConsensusRoundHandler {
  public:
   explicit TestTransactionFactory(log::Log* log)
       : consensus_(nullptr),
@@ -681,7 +681,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
     consensus_ = consensus;
   }
 
-  Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) OVERRIDE {
+  Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& round) override {
     auto txn = new TestDriver(pool_.get(), log_, round);
     txn->round_->SetConsensusReplicatedCallback(std::bind(
         &TestDriver::ReplicationFinished,
@@ -690,6 +690,8 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
     return Status::OK();
   }
 
+  void FinishConsensusOnlyRound(ConsensusRound* /*round*/) override {}
+
   void ReplicateAsync(ConsensusRound* round) {
     CHECK_OK(consensus_->Replicate(round));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cd58896..f4ba768 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -227,7 +227,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                             scoped_refptr<log::Log> log,
                             scoped_refptr<TimeManager> time_manager,
-                            ReplicaTransactionFactory* txn_factory,
+                            ConsensusRoundHandler* round_handler,
                             const scoped_refptr<MetricEntity>& metric_entity,
                             Callback<void(const std::string& reason)> mark_dirty_clbk) {
   DCHECK(metric_entity);
@@ -235,7 +235,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
   peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
   log_ = DCHECK_NOTNULL(std::move(log));
   time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
-  txn_factory_ = DCHECK_NOTNULL(txn_factory);
+  round_handler_ = DCHECK_NOTNULL(round_handler);
   mark_dirty_clbk_ = std::move(mark_dirty_clbk);
 
   term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, CurrentTerm());
@@ -328,7 +328,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                                    << SecureShortDebugString(cmeta_->ActiveConfig());
     for (ReplicateMsg* replicate : info.orphaned_replicates) {
       ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
-      RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
+      RETURN_NOT_OK(StartFollowerTransactionUnlocked(replicate_ptr));
     }
 
     // Set the initial committed opid for the PendingRounds only after
@@ -927,7 +927,7 @@ static bool IsConsensusOnlyOperation(OperationType op_type) {
   return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
 }
 
-Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg) {
+Status RaftConsensus::StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg) {
   DCHECK(lock_.is_locked());
 
   if (IsConsensusOnlyOperation(msg->get()->op_type())) {
@@ -943,7 +943,7 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
                                << SecureShortDebugString(msg->get()->id());
   scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
   ConsensusRound* round_ptr = round.get();
-  RETURN_NOT_OK(txn_factory_->StartReplicaTransaction(round));
+  RETURN_NOT_OK(round_handler_->StartFollowerTransaction(round));
   return AddPendingOperationUnlocked(round_ptr);
 }
 
@@ -1386,7 +1386,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     }
 
     while (iter != deduped_req.messages.end()) {
-      prepare_status = StartReplicaTransactionUnlocked(*iter);
+      prepare_status = StartFollowerTransactionUnlocked(*iter);
       if (PREDICT_FALSE(!prepare_status.ok())) {
         break;
       }
@@ -2636,6 +2636,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   }
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
                                << round->id();
+  round_handler_->FinishConsensusOnlyRound(round);
   gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
   commit_msg->set_op_type(round->replicate_msg()->op_type());
   *commit_msg->mutable_commited_op_id() = round->id();

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index f84a71e..f472103 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -72,10 +72,10 @@ namespace consensus {
 
 class ConsensusMetadataManager;
 class ConsensusRound;
+class ConsensusRoundHandler;
 class PeerManager;
 class PeerProxyFactory;
 class PendingRounds;
-class ReplicaTransactionFactory;
 struct ConsensusBootstrapInfo;
 struct ElectionResult;
 
@@ -148,7 +148,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                scoped_refptr<log::Log> log,
                scoped_refptr<TimeManager> time_manager,
-               ReplicaTransactionFactory* txn_factory,
+               ConsensusRoundHandler* round_handler,
                const scoped_refptr<MetricEntity>& metric_entity,
                Callback<void(const std::string& reason)> mark_dirty_clbk);
 
@@ -518,7 +518,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Begin a replica transaction. If the type of message in 'msg' is not a type
   // that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
-  Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
+  Status StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg);
 
   // Returns true if this node is the only voter in the Raft configuration.
   bool IsSingleVoterConfig() const;
@@ -803,9 +803,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   scoped_refptr<TimeManager> time_manager_;
   gscoped_ptr<PeerProxyFactory> peer_proxy_factory_;
 
-  // When we receive a message from a remote peer telling us to start a transaction, we use
-  // this factory to start it.
-  ReplicaTransactionFactory* txn_factory_;
+  // When we receive a message from a remote peer telling us to start a
+  // transaction, or finish a round, we use this handler to handle it.
+  // This may update replica state (e.g. the tablet replica).
+  ConsensusRoundHandler* round_handler_;
 
   std::unique_ptr<PeerManager> peer_manager_;
 
@@ -896,32 +897,39 @@ struct ConsensusBootstrapInfo {
   DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
 };
 
-// Factory for replica transactions.
-// An implementation of this factory must be registered prior to consensus
-// start, and is used to create transactions when the consensus implementation receives
-// messages from the leader.
+// Handler for consensus rounds.
+// An implementation of this handler must be registered prior to consensus
+// start, and is used to:
+// - Create transactions when the consensus implementation receives messages
+//   from the leader.
+// - Handle when the consensus implementation finishes a non-transaction round
 //
-// Replica transactions execute the following way:
+// Follower transactions execute the following way:
 //
 // - When a ReplicateMsg is first received from the leader, the RaftConsensus
-//   instance creates the ConsensusRound and calls StartReplicaTransaction().
-//   This will trigger the Prepare(). At the same time replica consensus
-//   instance immediately stores the ReplicateMsg in the Log. Once the replicate
+//   instance creates the ConsensusRound and calls StartFollowerTransaction().
+//   This will trigger the Prepare(). At the same time, the follower's consensus
+//   instance immediately stores the ReplicateMsg in the Log. Once the
 //   message is stored in stable storage an ACK is sent to the leader (i.e. the
 //   replica RaftConsensus instance does not wait for Prepare() to finish).
 //
-// - When the CommitMsg for a replicate is first received from the leader
-//   the replica waits for the corresponding Prepare() to finish (if it has
-//   not completed yet) and then proceeds to trigger the Apply().
+// - When the CommitMsg for a replicate is first received from the leader, the
+//   follower waits for the corresponding Prepare() to finish (if it has not
+//   completed yet) and then proceeds to trigger the Apply().
 //
-// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
+// - Once Apply() completes the ConsensusRoundHandler is responsible for logging
 //   a CommitMsg to the log to ensure that the operation can be properly restored
 //   on a restart.
-class ReplicaTransactionFactory {
+class ConsensusRoundHandler {
  public:
-  virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+  virtual ~ConsensusRoundHandler() {}
 
-  virtual ~ReplicaTransactionFactory() {}
+  virtual Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
+
+  // Consensus-only rounds complete when non-transaction ops finish
+  // replication. This can be used to trigger callbacks, akin to an Apply() for
+  // transaction ops.
+  virtual void FinishConsensusOnlyRound(ConsensusRound* round) = 0;
 };
 
 // Context for a consensus round on the LEADER side, typically created as an

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index b291bc9..c1fa863 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -63,6 +63,9 @@ using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
+
+using strings::Substitute;
+
 namespace tserver {
 
 static const int kConsensusRpcTimeoutForTests = 50;
@@ -314,13 +317,22 @@ TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithCrashyNo
 TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithChurnyElections) {
   vector<string> ts_flags, master_flags;
 
+  int raft_heartbeat_interval;
 #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
   // On TSAN/ASAN builds, we need to be a little bit less churny in order to make
   // any progress at all.
-  ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+  raft_heartbeat_interval = 100;
 #else
-  ts_flags.emplace_back("--raft_heartbeat_interval_ms=2");
+  raft_heartbeat_interval = 50;
 #endif
+  // Inject random latency of up to the Raft heartbeat interval to ensure there
+  // will be missed heartbeats, triggering actual elections.
+  ts_flags = {
+    Substitute("--raft_heartbeat_interval_ms=$0", raft_heartbeat_interval),
+    Substitute("--consensus_inject_latency_ms_in_notifications=$0", raft_heartbeat_interval),
+    "--raft_enable_pre_election=false",
+    "--leader_failure_max_missed_heartbeat_periods=1",
+  };
 
   int num_batches = 200;
   if (AllowSlowTests()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 20c3c7b..1706340 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -93,10 +93,12 @@ DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 DECLARE_int32(rpc_timeout);
 
+METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
 METRIC_DECLARE_gauge_int64(time_since_last_leader_heartbeat);
 METRIC_DECLARE_gauge_int64(failed_elections_since_stable_leader);
+METRIC_DECLARE_gauge_uint64(hybrid_clock_timestamp);
 
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
@@ -161,6 +163,9 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   RaftConsensusITest() {
   }
 
+  // Gets the current timestamp on the given server.
+  int64_t GetTimestampOnServer(TServerDetails* tserver) const;
+
   // Scan the given replica in a loop until the number of rows
   // is 'expected_count'. If it takes more than 10 seconds, then
   // fails the test.
@@ -169,9 +174,10 @@ class RaftConsensusITest : public RaftConsensusITestBase {
                        vector<string>* results);
 
   // Add an Insert operation to the given consensus request.
-  // The row to be inserted is generated based on the OpId.
-  void AddOp(const OpId& id, ConsensusRequestPB* req);
-  void AddOpWithTypeAndKey(const OpId& id,
+  // The row to be inserted is generated based on the OpId with a timestamp
+  // that is greater than 'base_ts'.
+  void AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req);
+  void AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
                            RowOperationsPB::Type op_type,
                            int32_t key,
                            ConsensusRequestPB* req);
@@ -223,6 +229,14 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   vector<scoped_refptr<kudu::Thread> > threads_;
 };
 
+int64_t RaftConsensusITest::GetTimestampOnServer(TServerDetails* tserver) const {
+  int64_t ret;
+  ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(tserver->uuid());
+  CHECK_OK(GetInt64Metric(ets->bound_http_hostport(), &METRIC_ENTITY_server,
+                          nullptr, &METRIC_hybrid_clock_timestamp, "value", &ret));
+  return ret;
+}
+
 void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy,
                                          int expected_count,
                                          vector<string>* results) {
@@ -245,12 +259,12 @@ void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy
          << ": rows: " << *results;
 }
 
-void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
-  AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
+void RaftConsensusITest::AddOp(const OpId& id, int64_t base_ts, ConsensusRequestPB* req) {
+  AddOpWithTypeAndKey(id, base_ts, RowOperationsPB::INSERT,
                       id.index() * 10000 + id.term(), req);
 }
 
-void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
+void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id, int64_t base_ts,
                                              RowOperationsPB::Type op_type,
                                              int32_t key,
                                              ConsensusRequestPB* req) {
@@ -260,7 +274,7 @@ void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
   // increasing per op and starts off higher than 1. This is required, as some
   // test cases test the scenario where the WAL is replayed and no-ops and
   // writes are expected to have monotonically increasing timestamps.
-  msg->set_timestamp(id.index() * 10000 + id.term());
+  msg->set_timestamp(base_ts + id.index() * 10000 + id.term());
   msg->set_op_type(consensus::WRITE_OP);
   WriteRequestPB* write_req = msg->mutable_write_request();
   CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
@@ -595,8 +609,6 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_
 
 void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
   const vector<string> kTsFlags = {
-    // Don't use the hybrid clock as we set logical timestamps on ops.
-    "--use_hybrid_clock=false",
     "--enable_leader_failure_detection=false",
   };
   const vector<string> kMasterFlags = {
@@ -1060,10 +1072,12 @@ TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
 
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+
   // Send operations 2.1 through 2.3, committing through 2.2.
-  AddOp(MakeOpId(2, 1), &req);
-  AddOp(MakeOpId(2, 2), &req);
-  AddOp(MakeOpId(2, 3), &req);
+  AddOp(MakeOpId(2, 1), base_ts, &req);
+  AddOp(MakeOpId(2, 2), base_ts, &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
   req.set_committed_index(2);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1090,7 +1104,7 @@ TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
   req.set_caller_term(3);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 3));
   req.clear_ops();
-  AddOp(MakeOpId(3, 4), &req);
+  AddOp(MakeOpId(3, 4), base_ts, &req);
   ASSERT_EVENTUALLY([&]() {
       rpc.Reset();
       ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1130,9 +1144,10 @@ TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
   req.set_caller_term(2);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
-  AddOpWithTypeAndKey(MakeOpId(2, 2), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(2, 3), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(2, 4), RowOperationsPB::UPSERT, 1, &req);
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+  AddOpWithTypeAndKey(MakeOpId(2, 2), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 3), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
   req.set_committed_index(2);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1142,8 +1157,8 @@ TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
   req.set_caller_term(3);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 3));
   req.clear_ops();
-  AddOpWithTypeAndKey(MakeOpId(3, 4), RowOperationsPB::UPSERT, 1, &req);
-  AddOpWithTypeAndKey(MakeOpId(3, 5), RowOperationsPB::UPSERT, 2, &req);
+  AddOpWithTypeAndKey(MakeOpId(3, 4), base_ts, RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(3, 5), base_ts, RowOperationsPB::UPSERT, 2, &req);
   rpc.Reset();
   rpc.set_timeout(MonoDelta::FromSeconds(5));
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1205,9 +1220,10 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
 
   // Send some operations, but don't advance the commit index.
   // They should not commit.
-  AddOp(MakeOpId(2, 2), &req);
-  AddOp(MakeOpId(2, 3), &req);
-  AddOp(MakeOpId(2, 4), &req);
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
+  AddOp(MakeOpId(2, 2), base_ts, &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
+  AddOp(MakeOpId(2, 4), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
@@ -1223,7 +1239,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // request, and the replica should reject it.
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
   req.clear_ops();
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
@@ -1236,8 +1252,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // Send ops 3.5 and 2.6, then commit up to index 6, the replica
   // should fail because of the out-of-order terms.
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-  AddOp(MakeOpId(3, 5), &req);
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(3, 5), base_ts, &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_TRUE(resp.has_error()) << SecureDebugString(resp);
@@ -1255,7 +1271,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   resp.Clear();
   req.clear_ops();
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
-  AddOp(MakeOpId(2, 3), &req);
+  AddOp(MakeOpId(2, 3), base_ts, &req);
   req.set_committed_index(4);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1273,8 +1289,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // Now send some more ops, and commit the earlier ones.
   req.set_committed_index(4);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-  AddOp(MakeOpId(2, 5), &req);
-  AddOp(MakeOpId(2, 6), &req);
+  AddOp(MakeOpId(2, 5), base_ts, &req);
+  AddOp(MakeOpId(2, 6), base_ts, &req);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
@@ -1324,8 +1340,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     req.set_caller_uuid("new_leader");
     req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
     req.clear_ops();
-    AddOp(MakeOpId(leader_term, 5), &req);
-    AddOp(MakeOpId(leader_term, 6), &req);
+    AddOp(MakeOpId(leader_term, 5), base_ts, &req);
+    AddOp(MakeOpId(leader_term, 6), base_ts, &req);
     rpc.Reset();
     ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
     ASSERT_FALSE(resp.has_error()) << "Req: " << SecureShortDebugString(req)
@@ -1850,8 +1866,9 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   req.set_committed_index(1);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
   for (int i = 0; i < kNumOps; i++) {
-    AddOp(MakeOpId(1, 2 + i), &req);
+    AddOp(MakeOpId(1, 2 + i), base_ts, &req);
   }
   OpId last_opid = MakeOpId(1, 2 + kNumOps - 1);
   ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1874,7 +1891,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
   req.mutable_preceding_id()->CopyFrom(last_opid);
   req.set_committed_index(last_opid.index());
   req.mutable_ops()->Clear();
-  AddOp(MakeOpId(1, last_opid.index() + 1), &req);
+  AddOp(MakeOpId(1, last_opid.index() + 1), base_ts, &req);
   rpc.Reset();
   Status s = replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc);
 
@@ -2579,8 +2596,9 @@ TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
   req.set_committed_index(0);
   req.set_all_replicated_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
+  int64_t base_ts = GetTimestampOnServer(replica_ts);
   for (int i = 0; i < kNumOps; i++) {
-    AddOp(MakeOpId(0, 1 + i), &req);
+    AddOp(MakeOpId(0, 1 + i), base_ts, &req);
   }
 
   ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/integration-tests/raft_consensus_election-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
index 3debfb5..9e12bd2 100644
--- a/src/kudu/integration-tests/raft_consensus_election-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -100,8 +100,10 @@ void RaftConsensusElectionITest::CreateClusterForChurnyElectionsTests(
   // On TSAN builds, we need to be a little bit less churny in order to make
   // any progress at all.
   ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+  ts_flags.emplace_back("--inject_latency_ms_before_starting_txn=100");
 #else
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
+  ts_flags.emplace_back("--inject_latency_ms_before_starting_txn=1000");
 #endif
 
   ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/integration-tests/timestamp_advancement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index 88bfff5..be30bdf 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -38,6 +38,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -232,5 +233,31 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
   ASSERT_NE(cleantime, Timestamp::kInitialTimestamp);
 }
 
+// Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in
+// both the "normal" case and the single-replica case.
+TEST_F(TimestampAdvancementITest, TestTimestampsAdvancedFromRaftNoOp) {
+  const int kTserver = 0;
+  for (int num_replicas : { 1, 3 }) {
+    LOG(INFO) << strings::Substitute("Running with $0 replicas", num_replicas);
+    NO_FATALS(StartCluster(num_replicas));
+
+    // Create an empty tablet with a single replica.
+    TestWorkload create_tablet(cluster_.get());
+    create_tablet.set_num_replicas(num_replicas);
+    create_tablet.Setup();
+    scoped_refptr<TabletReplica> replica = tablet_replica_on_ts(kTserver);
+
+    // Despite there not being any writes, the replica will eventually bump its
+    // MVCC clean time on its own when a leader gets elected and replicates a
+    // no-op message.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_NE(replica->tablet()->mvcc_manager()->GetCleanTimestamp(),
+                Timestamp::kInitialTimestamp);
+    });
+    replica.reset();
+    StopCluster();
+  }
+}
+
 }  // namespace itest
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index 62d3d43..21e97e0 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -22,6 +22,7 @@
 #include <ostream>
 #include <utility>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
@@ -30,8 +31,17 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 
+DEFINE_int32(inject_latency_ms_before_starting_txn, 0,
+             "Amount of latency in ms to inject before registering "
+             "a transaction with MVCC.");
+TAG_FLAG(inject_latency_ms_before_starting_txn, advanced);
+TAG_FLAG(inject_latency_ms_before_starting_txn, hidden);
+
 namespace kudu {
 namespace tablet {
 
@@ -46,6 +56,7 @@ MvccManager::MvccManager()
 }
 
 void MvccManager::StartTransaction(Timestamp timestamp) {
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn);
   std::lock_guard<LockType> l(lock_);
   CHECK(!cur_snap_.IsCommitted(timestamp)) << "Trying to start a new txn at an already-committed"
                                            << " timestamp: " << timestamp.ToString()
@@ -53,7 +64,7 @@ void MvccManager::StartTransaction(Timestamp timestamp) {
   CHECK(InitTransactionUnlocked(timestamp)) << "There is already a transaction with timestamp: "
                                             << timestamp.value() << " in flight or this timestamp "
                                             << "is before than or equal to \"safe\" time."
-                                            << "Current Snapshot: " << cur_snap_.ToString()
+                                            << " Current Snapshot: " << cur_snap_.ToString()
                                             << " Current safe time: " << safe_time_;
 }
 
@@ -176,12 +187,11 @@ void MvccManager::AdjustSafeTime(Timestamp safe_time) {
     DVLOG(4) << "Adjusting safe time to: " << safe_time;
     safe_time_ = safe_time;
   } else {
-    // TODO(dralves) This shouldn't happen, the safe time passed to MvccManager should be
-    // monotically increasing. If if does though, the impact is on scan snapshot correctness,
-    // not on corruption of state and some test-only code sets this back (LocalTabletWriter).
-    // Note that we will still crash if a transaction comes in with a timestamp that is lower
-    // than 'cur_snap_.all_committed_before_'.
-    LOG_EVERY_N(ERROR, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
+    // Note: Getting here means that we are about to apply a transaction out of
+    // order. This out-of-order applying is only safe because concurrrent
+    // transactions are guaranteed to not affect the same state based on locks
+    // taken before starting the transaction (e.g. row locks, schema locks).
+    KLOG_EVERY_N(INFO, 10) << Substitute("Tried to move safe_time back from $0 to $1. "
                                          "Current Snapshot: $2", safe_time_.ToString(),
                                          safe_time.ToString(), cur_snap_.ToString());
     return;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 24af2b3..5ce1f95 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -219,7 +219,8 @@ class MvccManager {
   // StartApplyingTransaction(), or else this logs a FATAL error.
   void CommitTransaction(Timestamp timestamp);
 
-  // Adjusts the safe time so that the MvccManager can trim state.
+  // Adjusts the safe time so that the MvccManager can trim state, provided
+  // 'safe_time' is higher than the current safe time.
   //
   // This must only be called when there is a guarantee that there won't be
   // any more transactions with timestamps equal to or lower than 'safe_time'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 821cd29..751f3aa 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -30,6 +30,7 @@
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
@@ -39,6 +40,7 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
@@ -212,9 +214,9 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
     }
 
     // We cannot hold 'lock_' while we call RaftConsensus::Start() because it
-    // may invoke TabletReplica::StartReplicaTransaction() during startup, causing
-    // a self-deadlock. We take a ref to members protected by 'lock_' before
-    // unlocking.
+    // may invoke TabletReplica::StartFollowerTransaction() during startup,
+    // causing a self-deadlock. We take a ref to members protected by 'lock_'
+    // before unlocking.
     RETURN_NOT_OK(consensus_->Start(
         bootstrap_info,
         std::move(peer_proxy_factory),
@@ -579,7 +581,7 @@ Status TabletReplica::GetGCableDataSize(int64_t* retention_size) const {
   return Status::OK();
 }
 
-Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
+Status TabletReplica::StartFollowerTransaction(const scoped_refptr<ConsensusRound>& round) {
   {
     std::lock_guard<simple_spinlock> lock(lock_);
     if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
@@ -637,6 +639,43 @@ Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound
   return Status::OK();
 }
 
+void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
+  consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
+  consensus::OperationType op_type = replicate_msg->op_type();
+
+  // The timestamp of a Raft no-op used to assert term leadership is guaranteed
+  // to be lower than the timestamps of writes in the same terms and those
+  // thereafter. As such, we are able to bump the MVCC safe time with the
+  // timestamps of such no-ops, as further transaction timestamps are
+  // guaranteed to be higher than them.
+  //
+  // It is important for MVCC safe time updates to be serialized with respect
+  // to transactions. To ensure that we only advance the safe time with the
+  // no-op of term N after all transactions of term N-1 have been prepared, we
+  // run the adjustment function on the prepare thread, which is the same
+  // mechanism we use to serialize transactions.
+  //
+  // If the 'timestamp_in_opid_order' flag is unset, the no-op is assumed to be
+  // the Raft leadership no-op from a version of Kudu that only supported creating
+  // a no-op to assert a new leadership term, in which case it would be in order.
+  if (op_type == consensus::NO_OP &&
+      (!replicate_msg->noop_request().has_timestamp_in_opid_order() ||
+       replicate_msg->noop_request().timestamp_in_opid_order())) {
+    DCHECK(replicate_msg->has_noop_request());
+    int64_t ts = replicate_msg->timestamp();
+    // We are guaranteed that the prepare pool token is running now because
+    // TabletReplica::Stop() stops RaftConsensus before it stops the prepare
+    // pool token and this callback is invoked while the RaftConsensus lock is
+    // held.
+    CHECK_OK(prepare_pool_token_->SubmitFunc([this, ts] {
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (state_ == RUNNING || state_ == BOOTSTRAPPING) {
+        tablet_->mvcc_manager()->AdjustSafeTime(Timestamp(ts));
+      }
+    }));
+  }
+}
+
 Status TabletReplica::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
                                                  scoped_refptr<TransactionDriver>* driver) {
   scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(

http://git-wip-us.apache.org/repos/asf/kudu/blob/b3bb51ec/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 253e9c0..bf9d63d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -27,15 +27,14 @@
 
 #include <gtest/gtest_prod.h>
 
-#include "kudu/consensus/raft_consensus.h"
-#include "kudu/consensus/time_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/consensus/time_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
@@ -86,7 +85,7 @@ class WriteTransactionState;
 // peers see the same updates in the same order. In addition to this, this
 // class also splits the work and coordinates multi-threaded execution.
 class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
-                      public consensus::ReplicaTransactionFactory {
+                      public consensus::ConsensusRoundHandler {
  public:
   TabletReplica(scoped_refptr<TabletMetadata> meta,
                 scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
@@ -153,8 +152,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
 
   // Used by consensus to create and start a new ReplicaTransaction.
-  virtual Status StartReplicaTransaction(
-      const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
+  virtual Status StartFollowerTransaction(
+      const scoped_refptr<consensus::ConsensusRound>& round) override;
+
+  // Used by consensus to notify the tablet replica that a consensus-only round
+  // has finished, advancing MVCC safe time as appropriate.
+  virtual void FinishConsensusOnlyRound(consensus::ConsensusRound* round) override;
 
   consensus::RaftConsensus* consensus() {
     std::lock_guard<simple_spinlock> lock(lock_);


[2/3] kudu git commit: KUDU-2463 pt 3: don't scan if MVCC hasn't moved

Posted by ab...@apache.org.
KUDU-2463 pt 3: don't scan if MVCC hasn't moved

In cases when a tablet bootstrap yields an MvccManager whose safe time
has not been advanced (e.g. if there are no write ops in the WAL), and
the tablet has otherwise not bumped its MVCC timestamps (e.g. if it has
not yet elected a leader), a scan (whose correctness depends on the
MvccManager to determine what transactions have been applied) will
return incorrect results.

In the same way that we prevent compactions from occuring if MVCC's
timestamps have not been moved, this patch prevents incorrect results
from being returend from a scan by returning an error that can be
retried elsewhere.

New tests are added to attempt to scan in this state, verifying that we
get an error. A couple of tests that use the mock clock are also updated
so the initial timestamp assigned to a no-op is a more organic, non-zero
timestamp.

Change-Id: Idc0f77673e1f04a34ab1f5c1930bbaa2498b39bf
Reviewed-on: http://gerrit.cloudera.org:8080/11428
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/11690
Reviewed-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fae09bdd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fae09bdd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fae09bdd

Branch: refs/heads/branch-1.8.x
Commit: fae09bdd659fad59c4659d0860af6449ce40a990
Parents: b3bb51e
Author: Andrew Wong <aw...@cloudera.com>
Authored: Tue Sep 4 00:55:58 2018 -0700
Committer: Attila Bukor <ab...@apache.org>
Committed: Tue Oct 16 08:40:22 2018 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/consistency-itest.cc |   9 +
 .../tablet_history_gc-itest.cc                  |  27 +--
 .../timestamp_advancement-itest.cc              | 176 ++++++++++++++-----
 src/kudu/mini-cluster/external_mini_cluster.cc  |   6 +
 src/kudu/mini-cluster/external_mini_cluster.h   |   5 +
 src/kudu/mini-cluster/internal_mini_cluster.cc  |   7 +
 src/kudu/mini-cluster/internal_mini_cluster.h   |  10 +-
 src/kudu/mini-cluster/mini_cluster.h            |   8 +
 src/kudu/tablet/mvcc.cc                         |   9 +
 src/kudu/tablet/mvcc.h                          |   5 +
 src/kudu/tools/kudu-ts-cli-test.cc              |  20 ++-
 src/kudu/tserver/tablet_service.cc              |  11 ++
 12 files changed, 231 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index a0a2dc9..81231cd 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -113,6 +113,15 @@ class ConsistencyITest : public MiniClusterITestBase {
   virtual void SetUp() override {
     MiniClusterITestBase::SetUp();
     StartCluster(num_tablet_servers_);
+
+    // Since we're using mock NTP rather than the hybrid clock, it's possible
+    // that the first timestamp assigned to a tablet message is the initial
+    // timestamp (0). For correctness of scans, it is illegal to scan in this
+    // state. As such, we bump the clock up front so when we create tablets,
+    // they start out with more natural, non-0 values.
+    for (int i = 0; i < num_tablet_servers_; i++) {
+      cluster_->mini_tablet_server(i)->server()->clock()->Now();
+    }
   }
 
   void ScannerThread(KuduClient::ReplicaSelection selection,

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index e2ec09b..40b8ad5 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -523,6 +523,23 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
   FLAGS_scanner_ttl_ms = 1000 * 60 * 60 * 24;
 
   StartCluster(1); // Start InternalMiniCluster with a single tablet server.
+  // Since we're using mock NTP rather than the hybrid clock, it's possible
+  // that if we created a tablet now, the first timestamp assigned to a tablet
+  // message would be the initial timestamp (0). For correctness of scans, it
+  // is illegal to scan in this state. As such, we bump the clock up front so
+  // when we create tablets, they start out with more natural, non-0 values.
+  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
+
+  // Directly access the tserver so we can control compaction and the clock.
+  TabletServer* ts = mts->server();
+  clock_ = down_cast<HybridClock*>(ts->clock());
+
+  // Set initial clock time to 1000 seconds past 0, which is enough so that the
+  // AHM will not be negative.
+  const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000;
+  auto* ntp = down_cast<clock::MockNtp*>(clock_->time_service());
+  ntp->SetMockClockWallTimeForTests(kInitialMicroTime);
+
   TestWorkload workload(cluster_.get());
   workload.set_num_replicas(1);
   workload.Setup(); // Convenient way to create a table.
@@ -530,21 +547,11 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
   client::sp::shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(workload.table_name(), &table));
 
-  // Directly access the Tablet so we can control compaction and the clock.
-  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
-  TabletServer* ts = mts->server();
-  clock_ = down_cast<HybridClock*>(ts->clock());
   std::vector<scoped_refptr<TabletReplica>> replicas;
   ts->tablet_manager()->GetTabletReplicas(&replicas);
   ASSERT_EQ(1, replicas.size());
   Tablet* tablet = replicas[0]->tablet();
 
-  // Set initial clock time to 1000 seconds past 0, which is enough so that the
-  // AHM will not be negative.
-  const uint64_t kInitialMicroTime = 1L * 1000 * 1000 * 1000;
-  auto* ntp = down_cast<clock::MockNtp*>(clock_->time_service());
-  ntp->SetMockClockWallTimeForTests(kInitialMicroTime);
-
   LOG(INFO) << "Seeding random number generator";
   Random random(SeedRandom());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/integration-tests/timestamp_advancement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index be30bdf..b0b9d51 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -27,13 +27,19 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_index.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
@@ -44,14 +50,18 @@
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -62,27 +72,49 @@ DECLARE_bool(log_async_preallocate_segments);
 DECLARE_bool(raft_enable_pre_election);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
 
 namespace kudu {
 
-using cluster::InternalMiniCluster;
-using cluster::InternalMiniClusterOptions;
+using consensus::RaftPeerPB;
 using log::LogReader;
+using pb_util::SecureShortDebugString;
+using rpc::RpcController;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
 using tablet::TabletReplica;
 using tserver::MiniTabletServer;
+using tserver::NewScanRequestPB;
+using tserver::ScanResponsePB;
+using tserver::ScanRequestPB;
+using tserver::TabletServerErrorPB;
+using tserver::TabletServerServiceProxy;
 
 namespace itest {
 
 class TimestampAdvancementITest : public MiniClusterITestBase {
  public:
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  // Many tests will operate on a single tserver. The first one is chosen
+  // arbitrarily.
+  const int kTserver = 0;
+
   // Sets up a cluster and returns the tablet replica on 'ts' that has written
   // to its WAL. 'replica' will write further messages to a new WAL segment.
   void SetupClusterWithWritesInWAL(int ts, scoped_refptr<TabletReplica>* replica) {
+    // We're going to manually trigger maintenance ops, so disable maintenance op
+    // scheduling.
+    FLAGS_enable_maintenance_manager = false;
+
+    // Prevent preallocation of WAL segments in order to prevent races between
+    // the WAL allocation thread and our manual rolling over of the WAL.
+    FLAGS_log_preallocate_segments = false;
+    FLAGS_log_async_preallocate_segments = false;
+
     NO_FATALS(StartCluster(3));
 
     // Write some rows to the cluster.
@@ -128,6 +160,21 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
     return replicas[0];
   }
 
+  // Returns a scan response from the tablet on the given tablet server.
+  ScanResponsePB ScanResponseForTablet(int ts, const string& tablet_id) const {
+    ScanResponsePB resp;
+    RpcController rpc;
+    ScanRequestPB req;
+    NewScanRequestPB* scan = req.mutable_new_scan_request();
+    scan->set_tablet_id(tablet_id);
+    scan->set_read_mode(READ_LATEST);
+    const Schema schema = GetSimpleTestSchema();
+    CHECK_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+    shared_ptr<TabletServerServiceProxy> tserver_proxy = cluster_->tserver_proxy(ts);
+    CHECK_OK(tserver_proxy->Scan(req, &resp, &rpc));
+    return resp;
+  }
+
   // Returns true if there are any write replicate messages in the WALs of
   // 'tablet_id' on 'ts'.
   Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id,
@@ -156,27 +203,52 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
     *has_write_replicates = false;
     return Status::OK();
   }
+
+  // Repeatedly GCs the replica's WALs until there are no more write replicates
+  // in the WAL.
+  void GCUntilNoWritesInWAL(MiniTabletServer* tserver,
+                            scoped_refptr<TabletReplica> replica) {
+    ASSERT_EVENTUALLY([&] {
+      LOG(INFO) << "GCing logs...";
+      int64_t gcable_size;
+      ASSERT_OK(replica->GetGCableDataSize(&gcable_size));
+      ASSERT_GT(gcable_size, 0);
+      ASSERT_OK(replica->RunLogGC());
+
+      // Ensure that we have no writes in our WALs.
+      bool has_write_replicates;
+      ASSERT_OK(CheckForWriteReplicatesInLog(tserver, replica->tablet_id(),
+                                             &has_write_replicates));
+      ASSERT_FALSE(has_write_replicates);
+    });
+  }
+
+  // Shuts down all the nodes in a cluster and restarts the given tserver.
+  // Waits for the given replica on the tserver to start.
+  Status ShutdownAllNodesAndRestartTserver(MiniTabletServer* tserver,
+                                          const string& tablet_id) {
+    LOG(INFO) << "Shutting down cluster...";
+    cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
+    // Note: We shut down tservers individually rather than using
+    // ClusterNodes::TS, since the latter would invalidate our reference to
+    // 'tserver'.
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      cluster_->mini_tablet_server(i)->Shutdown();
+    }
+    LOG(INFO) << "Restarting single tablet server...";
+    RETURN_NOT_OK(tserver->Restart());
+    TServerDetails* ts_details = FindOrDie(ts_map_, tserver->uuid());
+    return WaitUntilTabletRunning(ts_details, tablet_id, kTimeout);
+  }
 };
 
 // Test that bootstrapping a Raft no-op from the WAL will advance the replica's
 // MVCC safe time timestamps.
 TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
-  // Set a low Raft heartbeat and encourage frequent elections so that we can
-  // fill up the WAL with no-op entries naturally.
+  // Set a low Raft heartbeat interval so we can inject churn elections.
   FLAGS_raft_heartbeat_interval_ms = 100;
-  FLAGS_raft_enable_pre_election = false;
-
-  // Prevent preallocation of WAL segments in order to prevent races between
-  // the WAL allocation thread and our manual rolling over of the WAL.
-  FLAGS_log_preallocate_segments = false;
-  FLAGS_log_async_preallocate_segments = false;
-
-  // We're going to manually trigger maintenance ops, so disable maintenance op
-  // scheduling.
-  FLAGS_enable_maintenance_manager = false;
 
   // Setup a cluster with some writes and a new WAL segment.
-  const int kTserver = 0;
   scoped_refptr<TabletReplica> replica;
   NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica));
   MiniTabletServer* ts = tserver(kTserver);
@@ -184,6 +256,7 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
 
   // Now that we're on a new WAL segment, inject latency to consensus so we
   // trigger elections, and wait for some terms to pass.
+  FLAGS_raft_enable_pre_election = false;
   FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0;
   FLAGS_consensus_inject_latency_ms_in_notifications = 100;
   const int64_t kNumExtraTerms = 10;
@@ -198,39 +271,60 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
   // where we can GC our logs. Note: we won't GC if there are replicas that
   // need to be caught up.
   FLAGS_consensus_inject_latency_ms_in_notifications = 0;
-  ASSERT_EVENTUALLY([&] {
-    LOG(INFO) << "GCing logs...";
-    int64_t gcable_size;
-    ASSERT_OK(replica->GetGCableDataSize(&gcable_size));
-    ASSERT_GT(gcable_size, 0);
-    ASSERT_OK(replica->RunLogGC());
-
-    // Ensure that we have no writes in our WALs.
-    bool has_write_replicates;
-    ASSERT_OK(CheckForWriteReplicatesInLog(ts, tablet_id, &has_write_replicates));
-    ASSERT_FALSE(has_write_replicates);
-  });
-
-  // Note: We shut down tservers individually rather than using
-  // ClusterNodes::TS, since the latter would invalidate our reference to 'ts'.
+  NO_FATALS(GCUntilNoWritesInWAL(ts, replica));
   replica.reset();
-  LOG(INFO) << "Shutting down the cluster...";
-  cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    cluster_->mini_tablet_server(i)->Shutdown();
-  }
-
-  // Now prevent elections to reduce consensus logging on the server.
-  LOG(INFO) << "Restarting single tablet server...";
-  ASSERT_OK(ts->Restart());
-  TServerDetails* ts_details = FindOrDie(ts_map_, ts->uuid());
+  ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id));
 
   // Despite there being no writes, there are no-ops, with which we can advance
   // MVCC's timestamps.
-  ASSERT_OK(WaitUntilTabletRunning(ts_details, tablet_id, MonoDelta::FromSeconds(30)));
   replica = tablet_replica_on_ts(kTserver);
   Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp();
   ASSERT_NE(cleantime, Timestamp::kInitialTimestamp);
+
+  // Verify that we can scan the replica with its MVCC timestamp raised.
+  ScanResponsePB resp = ScanResponseForTablet(kTserver, replica->tablet_id());
+  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
+}
+
+// Regression test for KUDU-2463, wherein scans would return incorrect results
+// if a tablet's MVCC snapshot hasn't advanced. Currently, the only way to
+// achieve this is if the cluster is restarted, the WAL only has change
+// configs, and the tablet cannot join a quorum.
+TEST_F(TimestampAdvancementITest, Kudu2463Test) {
+  scoped_refptr<TabletReplica> replica;
+  NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica));
+  MiniTabletServer* ts = tserver(kTserver);
+
+  const string tablet_id = replica->tablet_id();
+
+  // Update one of the followers repeatedly to generate a bunch of config
+  // changes in all the replicas' WALs.
+  TServerDetails* leader;
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(ts_map_, tablet_id, kTimeout, &followers));
+  ASSERT_FALSE(followers.empty());
+  for (int i = 0; i < 20; i++) {
+    RaftPeerPB::MemberType type = i % 2 == 0 ? RaftPeerPB::NON_VOTER : RaftPeerPB::VOTER;
+    WARN_NOT_OK(ChangeReplicaType(leader, tablet_id, followers[0], type, kTimeout),
+                "Couldn't send a change config!");
+  }
+  NO_FATALS(GCUntilNoWritesInWAL(ts, replica));
+
+  // Note: we need to reset the replica reference before restarting the server.
+  replica.reset();
+  ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id));
+
+  // Now open a scanner for the server.
+  ScanResponsePB resp = ScanResponseForTablet(kTserver, tablet_id);
+
+  // Scanning the tablet should yield an error.
+  LOG(INFO) << "Got response: " << SecureShortDebugString(resp);
+  ASSERT_TRUE(resp.has_error());
+  const TabletServerErrorPB& error = resp.error();
+  ASSERT_EQ(error.code(), TabletServerErrorPB::TABLET_NOT_RUNNING);
+  ASSERT_STR_CONTAINS(resp.error().status().message(), "safe time has not yet been initialized");
+  ASSERT_EQ(error.status().code(), AppStatusPB::UNINITIALIZED);
 }
 
 // Test to ensure that MVCC's current snapshot gets updated via Raft no-ops, in

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index f86294c..5bf98df 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -685,6 +685,12 @@ std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) c
   return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
 }
 
+std::shared_ptr<TabletServerServiceProxy> ExternalMiniCluster::tserver_proxy(int idx) const {
+  CHECK_LT(idx, tablet_servers_.size());
+  const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host());
+}
+
 Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
                                          client::sp::shared_ptr<client::KuduClient>* client) const {
   client::KuduClientBuilder defaults;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 6244887..9dc2f87 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -70,6 +70,10 @@ namespace server {
 class ServerStatusPB;
 } // namespace server
 
+namespace tserver {
+class TabletServerServiceProxy;
+} // namespace tserver
+
 namespace cluster {
 
 class ExternalDaemon;
@@ -296,6 +300,7 @@ class ExternalMiniCluster : public MiniCluster {
   std::shared_ptr<rpc::Messenger> messenger() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+  std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) const override;
 
   std::string block_manager_type() const {
     return opts_.block_manager_type;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/internal_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
index e272102..7e18346 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -38,6 +38,7 @@
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_options.h"
+#include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -65,6 +66,7 @@ using master::TSDescriptor;
 using std::shared_ptr;
 using tserver::MiniTabletServer;
 using tserver::TabletServer;
+using tserver::TabletServerServiceProxy;
 
 InternalMiniClusterOptions::InternalMiniClusterOptions()
   : num_masters(1),
@@ -395,6 +397,11 @@ std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) c
   return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
 }
 
+std::shared_ptr<TabletServerServiceProxy> InternalMiniCluster::tserver_proxy(int idx) const {
+  const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host());
+}
+
 string InternalMiniCluster::WalRootForTS(int ts_idx) const {
   return mini_tablet_server(ts_idx)->options()->fs_opts.wal_root;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/internal_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
index 64b4e64..d69de98 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -37,21 +37,22 @@ class Status;
 namespace client {
 class KuduClient;
 class KuduClientBuilder;
-}
+} // namespace client
 
 namespace master {
 class MasterServiceProxy;
 class MiniMaster;
 class TSDescriptor;
-}
+} // namespace master
 
 namespace rpc {
 class Messenger;
-}
+} // namespace rpc
 
 namespace tserver {
 class MiniTabletServer;
-}
+class TabletServerServiceProxy;
+} // namespace tserver
 
 namespace cluster {
 
@@ -197,6 +198,7 @@ class InternalMiniCluster : public MiniCluster {
   std::shared_ptr<rpc::Messenger> messenger() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
   std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+  std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) const override;
 
  private:
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/mini-cluster/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h
index 304fd81..627a683 100644
--- a/src/kudu/mini-cluster/mini_cluster.h
+++ b/src/kudu/mini-cluster/mini_cluster.h
@@ -42,6 +42,10 @@ namespace rpc {
 class Messenger;
 } // namespace rpc
 
+namespace tserver {
+class TabletServerServiceProxy;
+} // namespace tserver
+
 namespace cluster {
 
 // Mode to which node types a certain action (like Shutdown()) should apply.
@@ -153,6 +157,10 @@ class MiniCluster {
   // master at 'idx' is running.
   virtual std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const = 0;
 
+  // Returns an RPC proxy to the tserver at 'idx'. Requires that the tserver at
+  // 'idx' is running.
+  virtual std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) const = 0;
+
   // Returns the UUID for the tablet server 'ts_idx'
   virtual std::string UuidForTS(int ts_idx) const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index 21e97e0..ce65068 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -55,6 +55,15 @@ MvccManager::MvccManager()
   cur_snap_.none_committed_at_or_after_ = Timestamp::kInitialTimestamp;
 }
 
+Status MvccManager::CheckIsSafeTimeInitialized() const {
+  // We initialize the MVCC safe time and clean time at the same time, so if
+  // clean time has not been updated, neither has safe time.
+  if (GetCleanTimestamp() == Timestamp::kInitialTimestamp) {
+    return Status::Uninitialized("safe time has not yet been initialized");
+  }
+  return Status::OK();
+}
+
 void MvccManager::StartTransaction(Timestamp timestamp) {
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_inject_latency_ms_before_starting_txn);
   std::lock_guard<LockType> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 5ce1f95..347376e 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -185,6 +185,11 @@ class MvccManager {
  public:
   MvccManager();
 
+  // Returns an error if the current snapshot has not been adjusted past its
+  // initial state. While in this state, it is unsafe for the MvccManager to
+  // serve information about already-applied transactions.
+  Status CheckIsSafeTimeInitialized() const;
+
   // Begins a new transaction, which is assigned the provided timestamp.
   //
   // Requires that 'timestamp' is not committed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tools/kudu-ts-cli-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ts-cli-test.cc b/src/kudu/tools/kudu-ts-cli-test.cc
index 736afc8..f1d9e7a 100644
--- a/src/kudu/tools/kudu-ts-cli-test.cc
+++ b/src/kudu/tools/kudu-ts-cli-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 using kudu::itest::TabletServerMap;
 using kudu::itest::TServerDetails;
@@ -111,13 +112,18 @@ TEST_F(KuduTsCliTest, TestDumpTablet) {
 
   string out;
   // Test for dump_tablet when there is no data in tablet.
-  ASSERT_OK(RunKuduTool({
-    "remote_replica",
-    "dump",
-    cluster_->tablet_server(0)->bound_rpc_addr().ToString(),
-    tablet_id
-  }, &out));
-  ASSERT_EQ("", out);
+  // Because it's unsafe to scan a tablet replica that hasn't advanced its
+  // clean time, i.e. one that hasn't had any writes or elections, we assert
+  // that we are able to eventually scan.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(RunKuduTool({
+      "remote_replica",
+      "dump",
+      cluster_->tablet_server(0)->bound_rpc_addr().ToString(),
+      tablet_id
+    }, &out));
+    ASSERT_EQ("", out);
+  });
 
   // Insert very little data and dump_tablet again.
   workload.Start();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fae09bdd/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index d470cf3..8957ce5 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1868,6 +1868,17 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // tablet replica's shutdown is run concurrently with the code below.
   shared_ptr<Tablet> tablet;
   RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));
+
+  // Ensure the tablet has a valid clean time.
+  s = tablet->mvcc_manager()->CheckIsSafeTimeInitialized();
+  if (!s.ok()) {
+    LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
+                               tablet->tablet_id(), s.ToString());
+    // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
+    // over to another tablet server if fault-tolerant).
+    *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+    return s;
+  }
   {
     TRACE("Creating iterator");
     TRACE_EVENT0("tserver", "Create iterator");


[3/3] kudu git commit: docs: update release note for KUDU-2463

Posted by ab...@apache.org.
docs: update release note for KUDU-2463

Change-Id: Id8dce61da14f67e39f6573fa42ec54809f3ceb19
Reviewed-on: http://gerrit.cloudera.org:8080/11691
Reviewed-by: Grant Henke <gr...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cbbf7b58
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cbbf7b58
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cbbf7b58

Branch: refs/heads/branch-1.8.x
Commit: cbbf7b580c4ab4fdf6621e4ee5ab1ddc5f03cb4e
Parents: fae09bd
Author: Andrew Wong <aw...@cloudera.com>
Authored: Mon Oct 15 14:53:13 2018 -0700
Committer: Attila Bukor <ab...@apache.org>
Committed: Tue Oct 16 08:40:28 2018 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cbbf7b58/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 0bfbec5..f02233f 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -230,8 +230,8 @@
   crash the tablet server (see
   link:https://issues.apache.org/jira/browse/KUDU-2293[KUDU-2293]).
 
-- Reduced the likelihood of seeing a bug in which incorrect results would be returned in
-  scans following a server restart (see
+- Fixed a bug in which incorrect results would be returned in scans following a
+  server restart (see
   link:https://issues.apache.org/jira/browse/KUDU-2463[KUDU-2463]).
 
 - Fixed a bug causing a tablet server crash when a write batch request from a client