You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/12/06 14:40:02 UTC

[1/2] kudu git commit: KUDU-798 (part 4) Add a TimeManager to manage safe time advancement

Repository: kudu
Updated Branches:
  refs/heads/master 35d1b0142 -> 881cc8e98


KUDU-798 (part 4) Add a TimeManager to manage safe time advancement

This patch adds an entity to manage safe time (the timestamp before
which all transactions are committed/aborted or in-flight) across
a consensus configuration. This allows scans to make sure/wait that
the scan timestamp is safe, thus making sure the scan is repeatable.

This adds a small unit test. Follow up patches will use this
in integration tests.

Change-Id: I0bb2b1d2590ed7ead6f1980f9572be10444bb81b
Reviewed-on: http://gerrit.cloudera.org:8080/5300
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bacd9467
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bacd9467
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bacd9467

Branch: refs/heads/master
Commit: bacd946731f07a5fc390c26b3285e7560b8ef497
Parents: 35d1b01
Author: David Alves <dr...@apache.org>
Authored: Wed Nov 30 18:45:15 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Tue Dec 6 07:45:39 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt       |   2 +
 src/kudu/consensus/time_manager-test.cc | 200 ++++++++++++++++++++++
 src/kudu/consensus/time_manager.cc      | 244 +++++++++++++++++++++++++++
 src/kudu/consensus/time_manager.h       | 188 +++++++++++++++++++++
 4 files changed, 634 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index a2d08a6..091c100 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -105,6 +105,7 @@ set(CONSENSUS_SRCS
   quorum_util.cc
   raft_consensus.cc
   raft_consensus_state.cc
+  time_manager.cc
 )
 
 add_library(consensus ${CONSENSUS_SRCS})
@@ -135,6 +136,7 @@ ADD_KUDU_TEST(log_index-test)
 ADD_KUDU_TEST(mt-log-test)
 ADD_KUDU_TEST(quorum_util-test)
 ADD_KUDU_TEST(raft_consensus_quorum-test)
+ADD_KUDU_TEST(time_manager-test)
 
 # Our current version of gmock overrides virtual functions without adding
 # the 'override' keyword which, since our move to c++11, make the compiler

http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager-test.cc b/src/kudu/consensus/time_manager-test.cc
new file mode 100644
index 0000000..a97c547
--- /dev/null
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/server/clock.h"
+#include "kudu/server/hybrid_clock.h"
+#include "kudu/server/logical_clock.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+namespace consensus {
+
+using std::unique_ptr;
+
+class TimeManagerTest : public KuduTest {
+ public:
+  TimeManagerTest() : clock_(new server::HybridClock()) {}
+
+  void SetUp() override {
+    CHECK_OK(clock_->Init());
+  }
+
+  void TearDown() override {
+    for (auto& thread : threads_) {
+      thread.join();
+    }
+  }
+
+ protected:
+  void InitTimeManager(Timestamp initial_safe_time = Timestamp::kMin) {
+    time_manager_.reset(new TimeManager(clock_, initial_safe_time));
+  }
+
+  // Returns a latch that allows to wait for TimeManager to consider 'safe_time' safe.
+  CountDownLatch* WaitForSafeTimeAsync(Timestamp safe_time) {
+    latches_.emplace_back(new CountDownLatch(1));
+    CountDownLatch* latch = latches_.back().get();
+    threads_.emplace_back([=]() {
+      CHECK_OK(time_manager_->WaitUntilSafe(safe_time, MonoTime::Max()));
+      // When the waiter unblocks safe time should be higher than or equal to 'safe_time'
+      CHECK_GE(time_manager_->GetSafeTime(), safe_time);
+      latch->CountDown();
+    });
+    return latch;
+  }
+
+  scoped_refptr<server::HybridClock> clock_;
+  scoped_refptr<TimeManager> time_manager_;
+  vector<unique_ptr<CountDownLatch>> latches_;
+  vector<std::thread> threads_;
+};
+
+// Tests TimeManager's functionality in non-leader mode and the transition to leader mode.
+TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
+  // TimeManager should start in non-leader mode and consider the initial timestamp safe.
+  Timestamp before = clock_->Now();
+  Timestamp init(before.value() + 1);
+  Timestamp after(init.value() + 1);
+  InitTimeManager(init);
+  ASSERT_EQ(time_manager_->mode_, TimeManager::NON_LEADER);
+  ASSERT_EQ(time_manager_->last_serial_ts_assigned_, init);
+  ASSERT_EQ(time_manager_->GetSafeTime(), init);
+
+  // Check that 'before' is safe, as is 'init'. 'after' shouldn't be safe.
+  ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(before));
+  ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(init));
+  ASSERT_FALSE(time_manager_->IsTimestampSafeUnlocked(after));
+
+  // Shouldn't be able to assign timestamps.
+  ReplicateMsg message;
+  ASSERT_TRUE(time_manager_->AssignTimestamp(&message).IsIllegalState());
+
+  message.set_timestamp(after.value());
+  // Should accept messages from the leader.
+  ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+  ASSERT_EQ(time_manager_->last_serial_ts_assigned_, after);
+  // .. but shouldn't advance safe time (until we have leader leases).
+  ASSERT_EQ(time_manager_->GetSafeTime(), init);
+
+  // Waiting for safe time at this point should time out since we're not moving it.
+  MonoTime after_small = MonoTime::Now() + MonoDelta::FromMilliseconds(100);
+  ASSERT_TRUE(time_manager_->WaitUntilSafe(after, after_small).IsTimedOut());
+
+  // Create a latch to wait on 'after' to be safe.
+  CountDownLatch* after_latch = WaitForSafeTimeAsync(after);
+
+  // Accepting messages from the leader shouldn't advance safe time.
+  ASSERT_EQ(time_manager_->GetSafeTime(), init);
+  ASSERT_EQ(after_latch->count(), 1);
+
+  // Advancing safe time with a message should unblock the waiter and advance safe time.
+  message.set_timestamp(after.value());
+  time_manager_->AdvanceSafeTimeWithMessage(message);
+  after_latch->Wait();
+  ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+  // Committing an old message shouldn't move safe time back.
+  message.set_timestamp(before.value());
+  time_manager_->AdvanceSafeTimeWithMessage(message);
+  ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+  // Advance 'after' again and test advancing safe time with an explicit timestamp like
+  // the leader sends on (empty) heartbeat messages.
+  after = clock_->Now();
+  after_latch = WaitForSafeTimeAsync(after);
+  time_manager_->AdvanceSafeTime(after);
+  after_latch->Wait();
+  ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+  // Changing to leader mode should advance safe time.
+  after = clock_->Now();
+  after_latch = WaitForSafeTimeAsync(after);
+  time_manager_->SetLeaderMode();
+  after_latch->Wait();
+  ASSERT_GE(time_manager_->GetSafeTime(), after);
+}
+
+// Tests the TimeManager's functionality in leader mode and the transition to non-leader mode.
+TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
+  Timestamp init = clock_->Now();
+  InitTimeManager(init);
+  time_manager_->SetLeaderMode();
+  Timestamp safe_before = time_manager_->GetSafeTime();
+
+  ReplicateMsg message;
+  // In leader mode we should be able to assign timestamps and the timestamp should be higher
+  // than 'init'.
+  ASSERT_OK(time_manager_->AssignTimestamp(&message));
+  ASSERT_TRUE(message.has_timestamp());
+  Timestamp message_ts(message.timestamp());
+  ASSERT_GT(message_ts, safe_before);
+
+  // In leader mode calling MessageReceivedFromLeader() should cause a CHECK failure.
+  EXPECT_DEATH({
+     time_manager_->MessageReceivedFromLeader(message);
+    }, "Cannot receive messages from a leader in leader mode.");
+
+  // .. as should AdvanceSafeTime()
+  EXPECT_DEATH({
+     time_manager_->AdvanceSafeTime(clock_->Now());
+    }, "Cannot advance safe time by timestamp in leader mode.");
+
+  // Since we haven't appended the message to the queue, safe time should be 'pinned' to
+  // 'safe_before'.
+  ASSERT_EQ(time_manager_->GetSafeTime(), safe_before);
+
+  // When we append the message to the queue safe time should advance again.
+  time_manager_->AdvanceSafeTimeWithMessage(message);
+  ASSERT_GT(time_manager_->GetSafeTime(), message_ts);
+
+  // 'Now' should be safe.
+  Timestamp now = clock_->Now();
+  ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(now));
+  ASSERT_GT(time_manager_->GetSafeTime(), now);
+
+  // When changing to non-leader mode a timestamp after the last safe time shouldn't be
+  // safe anymore (even if that time came before the actual change).
+  now = clock_->Now();
+  time_manager_->SetNonLeaderMode();
+  Timestamp safe_after = time_manager_->GetSafeTime();
+  ASSERT_LE(safe_after, now);
+
+  // In leader mode GetSafeTime() usually moves it, but since we changed to non-leader mode
+  // safe time shouldn't move anymore ...
+  ASSERT_EQ(time_manager_->GetSafeTime(), safe_after);
+  now = clock_->Now();
+  MonoTime after_small = MonoTime::Now();
+  after_small.AddDelta(MonoDelta::FromMilliseconds(100));
+  ASSERT_TRUE(time_manager_->WaitUntilSafe(now, after_small).IsTimedOut());
+
+  // ... unless we get a message from the leader.
+  now = clock_->Now();
+  CountDownLatch* after_latch = WaitForSafeTimeAsync(now);
+  message.set_timestamp(now.value());
+  ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+  time_manager_->AdvanceSafeTimeWithMessage(message);
+  after_latch->Wait();
+}
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.cc b/src/kudu/consensus/time_manager.cc
new file mode 100644
index 0000000..eac6ebb
--- /dev/null
+++ b/src/kudu/consensus/time_manager.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <gflags/gflags.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_bool(safe_time_advancement_without_writes, true,
+            "Whether to enable the advancement of \"safe\" time in "
+            "the absense of write operations");
+TAG_FLAG(safe_time_advancement_without_writes, advanced);
+
+namespace kudu {
+namespace consensus {
+
+using server::Clock;
+using strings::Substitute;
+
+typedef std::lock_guard<simple_spinlock> Lock;
+
+ExternalConsistencyMode TimeManager::GetMessageConsistencyMode(const ReplicateMsg& message) {
+  // TODO(dralves): We should have no-ops (?) and config changes be COMMIT_WAIT
+  // transactions. See KUDU-798.
+  // TODO(dralves) Move external consistency mode to ReplicateMsg. This will be useful
+  // for consistent alter table ops.
+  if (PREDICT_TRUE(message.has_write_request())) {
+    return message.write_request().external_consistency_mode();
+  }
+  return CLIENT_PROPAGATED;
+}
+
+TimeManager::TimeManager(scoped_refptr<Clock> clock, Timestamp initial_safe_time)
+  : last_serial_ts_assigned_(initial_safe_time),
+    last_safe_ts_(initial_safe_time),
+    mode_(NON_LEADER),
+    clock_(std::move(clock)) {}
+
+void TimeManager::SetLeaderMode() {
+  Lock l(lock_);
+  mode_ = LEADER;
+  AdvanceSafeTimeAndWakeUpWaitersUnlocked(clock_->Now());
+}
+
+void TimeManager::SetNonLeaderMode() {
+  Lock l(lock_);
+  mode_ = NON_LEADER;
+}
+
+Status TimeManager::AssignTimestamp(ReplicateMsg* message) {
+  Lock l(lock_);
+  if (PREDICT_FALSE(mode_ == NON_LEADER)) {
+    return Status::IllegalState("Cannot assign timestamp. TimeManager is not in Leader mode.");
+  }
+  Timestamp t;
+  switch (GetMessageConsistencyMode(*message)) {
+    case COMMIT_WAIT: t = GetSerialTimestampPlusMaxError(); break;
+    case CLIENT_PROPAGATED:  t = GetSerialTimestamp(); break;
+    default: return Status::NotSupported("Unsupported external consistency mode.");
+  }
+  message->set_timestamp(t.value());
+  return Status::OK();
+}
+
+Status TimeManager::MessageReceivedFromLeader(const ReplicateMsg& message) {
+  // NOTE: Currently this method just updates the clock and stores the message's timestamp.
+  //       It always returns Status::OK() if the clock returns an OK status on Update().
+  //
+  //       When we have leader leases we can trust that the timestamps of messages sent by
+  //       any valid leader are safe and we could increase safe time here. However, since
+  //       this is not yet the case we will only increase safe time later, when the message is
+  //       committed, at the cost of additional delay in moving safe time.
+  //
+  //       This greatly reduces the opportunity for non-repeatable reads. On a busy cluster
+  //       with a lot of writes (i.e. no empty, "heartbeat" messages) safe time moves only
+  //       with committed message timestamps, which are forcibly 'safe'.
+  //
+  //       The only opportunity for unrepeatable reads in this setup is if an old leader sends
+  //       an (accepted) empty heartbeat message to a follower that immediately afterwards
+  //       receives a non-empty message from another higher term leader but with a lower timestamp
+  //       than the empty heartbeat.
+  DCHECK(message.has_timestamp());
+  Timestamp t(message.timestamp());
+  RETURN_NOT_OK(clock_->Update(t));
+  {
+    Lock l(lock_);
+    CHECK_EQ(mode_, NON_LEADER) << "Cannot receive messages from a leader in leader mode.";
+    if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) {
+      last_serial_ts_assigned_ = t;
+    }
+  }
+  return Status::OK();
+}
+
+void TimeManager::AdvanceSafeTimeWithMessage(const ReplicateMsg& message) {
+  Lock l(lock_);
+  if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) {
+    AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp(message.timestamp()));
+  }
+}
+
+void TimeManager::AdvanceSafeTime(Timestamp safe_time) {
+  Lock l(lock_);
+  CHECK_EQ(mode_, NON_LEADER) << "Cannot advance safe time by timestamp in leader mode.";;
+  AdvanceSafeTimeAndWakeUpWaitersUnlocked(safe_time);
+}
+
+Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline) {
+  // First wait for the clock to be past 'timestamp'.
+  RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
+
+  if (PREDICT_FALSE(MonoTime::Now() > deadline)) {
+    return Status::TimedOut("Timed out waiting for the local clock.");
+  }
+
+  CountDownLatch latch(1);
+  WaitingState waiter;
+  waiter.timestamp = timestamp;
+  waiter.latch = &latch;
+
+  // Register a waiter in waiters_
+  {
+    Lock l(lock_);
+    if (IsTimestampSafeUnlocked(timestamp)) return Status::OK();
+    waiters_.push_back(&waiter);
+  }
+
+  // Wait until we get notified or 'deadline' elapses.
+  if (waiter.latch->WaitUntil(deadline)) return Status::OK();
+
+  // Timed out, clean up.
+  {
+    Lock l(lock_);
+    // Address the case where we were notified after the timeout.
+    if (waiter.latch->count() == 0) return Status::OK();
+
+    waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiter));
+    return Status::TimedOut(Substitute(
+        "Timed out waiting for ts: $0 to be safe (mode: $1). "
+        "Current safe time: $2 Physical time difference: $3",
+        clock_->Stringify(waiter.timestamp),
+        (mode_ == LEADER ? "LEADER" : "NON-LEADER"),
+        clock_->Stringify(last_safe_ts_),
+        (clock_->HasPhysicalComponent() ?
+         clock_->GetPhysicalComponentDifference(timestamp, last_safe_ts_).ToString() :
+         "None (Logical clock)")));
+  }
+}
+
+void TimeManager::AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time) {
+  if (safe_time <= last_safe_ts_) {
+    return;
+  }
+  last_safe_ts_ = safe_time;
+
+  if (PREDICT_FALSE(!waiters_.empty())) {
+    auto iter = waiters_.begin();
+    while (iter != waiters_.end()) {
+      WaitingState* waiter = *iter;
+      if (IsTimestampSafeUnlocked(waiter->timestamp)) {
+        iter = waiters_.erase(iter);
+        waiter->latch->CountDown();
+        continue;
+      }
+      iter++;
+    }
+  }
+}
+
+bool TimeManager::IsTimestampSafeUnlocked(Timestamp timestamp) {
+  return timestamp <= GetSafeTimeUnlocked();
+}
+
+Timestamp TimeManager::GetSafeTime()  {
+  Lock l(lock_);
+  return GetSafeTimeUnlocked();
+}
+
+Timestamp TimeManager::GetSafeTimeUnlocked() {
+  switch (mode_) {
+    case LEADER: {
+      // In ASCII form, where 'S' represents a safe timestamp, 'A' represents the last assigned
+      // timestamp, and 'N' represents the current clock value, the internal state can look like
+      // the following diagrams (time moves from left to right):
+      //
+      // a)
+      //   SSSSSSSSSSSSSSSSSS N
+      //                  |  \- last_safe_ts_
+      //                  |
+      //                   \- last_serial_ts_assigned_
+      // or like:
+      // b)
+      //   SSSSSSSSSSSSSSSSSS A N
+      //                    |  \- last_serial_ts_assigned_
+      //                    |
+      //                    \- last_safe_ts_
+      //
+      // If the current internal state is a), then we can advance safe time to 'N'. We know the
+      // leader will never assign a new timestamp lower than it.
+      if (PREDICT_TRUE(last_serial_ts_assigned_ <= last_safe_ts_)) {
+        last_safe_ts_ = clock_->Now();
+        return last_safe_ts_;
+      }
+      // If the current state is b), then there might be transaction with a timestamp that is lower
+      // than 'N' in between assignment and being appended to the queue. We can't consider 'N'
+      // safe and thus have to return the last known safe timestamp.
+      // Note that there can be at most one single transaction in this state, because prepare
+      // is single threaded.
+      return last_safe_ts_;
+    }
+    case NON_LEADER:
+      return last_safe_ts_;
+  }
+}
+
+Timestamp TimeManager::GetSerialTimestamp() {
+  last_serial_ts_assigned_ = clock_->Now();
+  return last_serial_ts_assigned_;
+}
+
+Timestamp TimeManager::GetSerialTimestampPlusMaxError() {
+  return clock_->NowLatest();
+}
+
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h
new file mode 100644
index 0000000..c8568d5
--- /dev/null
+++ b/src/kudu/consensus/time_manager.h
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/server/clock.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+class ReplicateMsg;
+
+// Manages timestamp assignment to consensus rounds and safe time advancement.
+//
+// Safe time corresponds to a timestamp before which all transactions have been applied to the
+// tablet or are in-flight and is a monotonically increasing timestamp (see note at the end
+// of this class comment).
+//
+// Snapshot scans can use WaitUntilSafe() to wait for a timestamp to be safe. After this method
+// returns an OK status, all the transactions whose timestamps fall before the scan's timestamp
+// will be either committed or in-flight. If the scanner additionally uses the MvccManager to wait
+// until the given timestamp is clean, then the read will be repeatable.
+//
+// In leader mode the TimeManager is responsible for assigning timestamps to transactions
+// and for moving the leader's safe time, which in turn may be sent to replicas on heartbeats
+// moving their safe time. The leader's safe time moves with the clock unless there has been a
+// transaction that was assigned a timestamp that is not yet known by the queue
+// (i.e. AdvanceSafeTimeWithMessage() hasn't been called on the corresponding message).
+// In this case the TimeManager returns the last known safe time.
+//
+// On non-leader mode this class tracks the safe time sent by the leader and updates waiters
+// when it advances.
+//
+// This class's leadership status is meant to be in tune with the queue's as the queue
+// is responsible for broadcasting safe time from a leader (and will eventually be responsible
+// for calculating that leader's lease).
+//
+// See: docs/design-docs/repeatable-reads.md
+//
+// NOTE: Until leader leases are implemented the cluster's safe time can occasionally move back.
+//       This does not mean, however, that the timestamp returned by GetSafeTime() can move back.
+//       GetSafeTime will still return monotonically increasing timestamps, it's just
+//       that, in certain corner cases, the timestamp returned by GetSafeTime() can't be trusted
+//       to mean that all future messages will be assigned future timestamps.
+//       This anomaly can cause non-repeatable reads in certain conditions.
+//
+// This class is thread safe.
+class TimeManager : public RefCountedThreadSafe<TimeManager> {
+ public:
+
+  // Constructs a TimeManager in non-leader mode.
+  TimeManager(scoped_refptr<server::Clock> clock,  Timestamp initial_safe_time);
+
+  // Sets this TimeManager to leader mode.
+  void SetLeaderMode();
+
+  // Sets this TimeManager to non-leader mode.
+  void SetNonLeaderMode();
+
+  // Assigns a timestamp to 'message' according to the message's ExternalConsistencyMode and/or
+  // message type.
+  //
+  // Note that the timestamp in 'message' is not considered safe until the message has
+  // been appended to the queue. Until then safe time is pinned to the last known value.
+  // When the message is appended later on, AdvanceSafeTimeWithMessage() is called and safe time
+  // is advanced.
+  //
+  // Requires Leader mode (non-OK status otherwise).
+  Status AssignTimestamp(ReplicateMsg* message);
+
+  // Updates the internal state based on 'message' received from a leader replica.
+  // Replicas are expected to call this for every message received from a valid leader.
+  //
+  // Returns Status::OK if the message/leader is valid and the clock was correctly updated.
+  //
+  // Requires non-leader mode (CHECK failure if it isn't).
+  Status MessageReceivedFromLeader(const ReplicateMsg& message);
+
+  // Advances safe time based on the timestamp and type of 'message'.
+  //
+  // This only moves safe time if 'message's timestamp is higher than the currently known one.
+  //
+  // Allowed in both leader and non-leader modes.
+  void AdvanceSafeTimeWithMessage(const ReplicateMsg& message);
+
+  // Same as above but for a specific timestamp.
+  //
+  // This only moves safe time if 'safe_time' is higher than the currently known one.
+  //
+  // Requires non-leader mode (CHECK failure if it isn't).
+  void AdvanceSafeTime(Timestamp safe_time);
+
+  // Waits until 'timestamp' is less than or equal to safe time or until 'deadline' has elapsed.
+  //
+  // Returns Status::OK() if it safe time advanced past 'timestamp' before 'deadline'
+  // Returns Status::TimeOut() if deadline elapsed without safe time moving enough.
+  //
+  // TODO(KUDU-1127) make this return another status if safe time is too far back in the past
+  // or hasn't moved in a long time.
+  Status WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline);
+
+  // Returns the current safe time.
+  //
+  // In leader mode returns clock_->Now() or some value close to it.
+  //
+  // In non-leader mode returns the last safe time received from a leader.
+  Timestamp GetSafeTime();
+ private:
+  FRIEND_TEST(TimeManagerTest, TestTimeManagerNonLeaderMode);
+  FRIEND_TEST(TimeManagerTest, TestTimeManagerLeaderMode);
+
+  // Helper to return the external consistency mode of 'message'.
+  static ExternalConsistencyMode GetMessageConsistencyMode(const ReplicateMsg& message);
+
+  // The mode of this TimeManager.
+  enum Mode {
+    LEADER,
+    NON_LEADER
+  };
+
+  // State for waiters.
+  struct WaitingState {
+    // The timestamp the waiter requires be safe.
+    Timestamp timestamp;
+    // Latch that will be count down once 'timestamp' is safe, unblocking the waiter.
+    CountDownLatch* latch;
+  };
+
+  // Returns whether 'timestamp' is safe.
+  // Requires that we've waited for the local clock to move past 'timestamp'.
+  bool IsTimestampSafeUnlocked(Timestamp timestamp);
+
+  // Advances safe time and wakes up any waiters.
+  void AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time);
+
+  // Returns a timestamp that is guaranteed to higher than all other timestamps
+  // that have been assigned by calls to GetSerialTimestamp() (in this or another
+  // replica).
+  Timestamp GetSerialTimestamp();
+
+  // Like the above, but returns a serial timestamp plus the maximum error.
+  // NOTE: GetSerialTimestamp() might still return timestamps that are smaller.
+  Timestamp GetSerialTimestampPlusMaxError();
+
+  // Internal, unlocked implementation of GetSafeTime();
+  Timestamp GetSafeTimeUnlocked();
+
+  // Lock to protect the non-const fields below.
+  mutable simple_spinlock lock_;
+
+  // Vector of waiters to be notified when the safe time advances.
+  mutable std::vector<WaitingState*> waiters_;
+
+  // The last serial timestamp that was assigned.
+  Timestamp last_serial_ts_assigned_;
+
+  // On replicas this is the latest safe time received from the leader, on the leader this is
+  // the last serial timestamp appended to the queue.
+  Timestamp last_safe_ts_;
+
+  // The current mode of the TimeManager.
+  Mode mode_;
+
+  const scoped_refptr<server::Clock> clock_;
+  const std::string local_peer_uuid_;
+};
+
+} // namespace consensus
+} // namespace kudu


[2/2] kudu git commit: Output more info when a snapshot scan fails due to history gc

Posted by mp...@apache.org.
Output more info when a snapshot scan fails due to history gc

This just adds some niceties to the error message, like
both timestamps and the physical time difference between
them.

Change-Id: Id488fd0366a8e8c5c2a21289a835207ea795ad20
Reviewed-on: http://gerrit.cloudera.org:8080/5374
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/881cc8e9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/881cc8e9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/881cc8e9

Branch: refs/heads/master
Commit: 881cc8e981dbe51ba21fffdb575abd3cf8fe3e5e
Parents: bacd946
Author: David Alves <dr...@apache.org>
Authored: Mon Dec 5 22:50:04 2016 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Dec 6 14:39:40 2016 +0000

----------------------------------------------------------------------
 src/kudu/tserver/tablet_service.cc | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/881cc8e9/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 5177a5a..51b5563 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1535,7 +1535,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
   // end up with a valid snapshot in that case. It would be more correct to
   // initialize the row iterator and then select the latest timestamp
   // represented by those open files in that case.
-  Timestamp ancient_history_mark;
   tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
   if (scan_pb.read_mode() == READ_AT_SNAPSHOT &&
       history_gc_opts.IsAncientHistory(*snap_timestamp)) {
@@ -1544,9 +1543,15 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
     // We have to check after we open the iterator in order to avoid a TOCTOU
     // error.
     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
-    return Status::InvalidArgument("Snapshot timestamp is earlier than the ancient history mark",
-                                   "consider increasing the value of the configuration parameter "
-                                   "--tablet_history_max_age_sec");
+    return Status::InvalidArgument(
+        Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider "
+                   "increasing the value of the configuration parameter "
+                   "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
+                   "Ancient History Mark: $1 Physical time difference: $2",
+                   server_->clock()->Stringify(*snap_timestamp),
+                   server_->clock()->Stringify(history_gc_opts.ancient_history_mark()),
+                   server_->clock()->GetPhysicalComponentDifference(
+                       *snap_timestamp, history_gc_opts.ancient_history_mark()).ToString()));
   }
 
   *has_more_results = iter->HasNext();