You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/12/06 14:40:02 UTC
[1/2] kudu git commit: KUDU-798 (part 4) Add a TimeManager to manage
safe time advancement
Repository: kudu
Updated Branches:
refs/heads/master 35d1b0142 -> 881cc8e98
KUDU-798 (part 4) Add a TimeManager to manage safe time advancement
This patch adds an entity to manage safe time (the timestamp before
which all transactions are committed/aborted or in-flight) across
a consensus configuration. This allows scans to make sure/wait that
the scan timestamp is safe, thus making sure the scan is repeatable.
This adds a small unit test. Follow up patches will use this
in integration tests.
Change-Id: I0bb2b1d2590ed7ead6f1980f9572be10444bb81b
Reviewed-on: http://gerrit.cloudera.org:8080/5300
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bacd9467
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bacd9467
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bacd9467
Branch: refs/heads/master
Commit: bacd946731f07a5fc390c26b3285e7560b8ef497
Parents: 35d1b01
Author: David Alves <dr...@apache.org>
Authored: Wed Nov 30 18:45:15 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Tue Dec 6 07:45:39 2016 +0000
----------------------------------------------------------------------
src/kudu/consensus/CMakeLists.txt | 2 +
src/kudu/consensus/time_manager-test.cc | 200 ++++++++++++++++++++++
src/kudu/consensus/time_manager.cc | 244 +++++++++++++++++++++++++++
src/kudu/consensus/time_manager.h | 188 +++++++++++++++++++++
4 files changed, 634 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index a2d08a6..091c100 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -105,6 +105,7 @@ set(CONSENSUS_SRCS
quorum_util.cc
raft_consensus.cc
raft_consensus_state.cc
+ time_manager.cc
)
add_library(consensus ${CONSENSUS_SRCS})
@@ -135,6 +136,7 @@ ADD_KUDU_TEST(log_index-test)
ADD_KUDU_TEST(mt-log-test)
ADD_KUDU_TEST(quorum_util-test)
ADD_KUDU_TEST(raft_consensus_quorum-test)
+ADD_KUDU_TEST(time_manager-test)
# Our current version of gmock overrides virtual functions without adding
# the 'override' keyword which, since our move to c++11, make the compiler
http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager-test.cc b/src/kudu/consensus/time_manager-test.cc
new file mode 100644
index 0000000..a97c547
--- /dev/null
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <thread>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/server/clock.h"
+#include "kudu/server/hybrid_clock.h"
+#include "kudu/server/logical_clock.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+namespace consensus {
+
+using std::unique_ptr;
+
+class TimeManagerTest : public KuduTest {
+ public:
+ TimeManagerTest() : clock_(new server::HybridClock()) {}
+
+ void SetUp() override {
+ CHECK_OK(clock_->Init());
+ }
+
+ void TearDown() override {
+ for (auto& thread : threads_) {
+ thread.join();
+ }
+ }
+
+ protected:
+ void InitTimeManager(Timestamp initial_safe_time = Timestamp::kMin) {
+ time_manager_.reset(new TimeManager(clock_, initial_safe_time));
+ }
+
+ // Returns a latch that allows to wait for TimeManager to consider 'safe_time' safe.
+ CountDownLatch* WaitForSafeTimeAsync(Timestamp safe_time) {
+ latches_.emplace_back(new CountDownLatch(1));
+ CountDownLatch* latch = latches_.back().get();
+ threads_.emplace_back([=]() {
+ CHECK_OK(time_manager_->WaitUntilSafe(safe_time, MonoTime::Max()));
+ // When the waiter unblocks safe time should be higher than or equal to 'safe_time'
+ CHECK_GE(time_manager_->GetSafeTime(), safe_time);
+ latch->CountDown();
+ });
+ return latch;
+ }
+
+ scoped_refptr<server::HybridClock> clock_;
+ scoped_refptr<TimeManager> time_manager_;
+ vector<unique_ptr<CountDownLatch>> latches_;
+ vector<std::thread> threads_;
+};
+
+// Tests TimeManager's functionality in non-leader mode and the transition to leader mode.
+TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
+ // TimeManager should start in non-leader mode and consider the initial timestamp safe.
+ Timestamp before = clock_->Now();
+ Timestamp init(before.value() + 1);
+ Timestamp after(init.value() + 1);
+ InitTimeManager(init);
+ ASSERT_EQ(time_manager_->mode_, TimeManager::NON_LEADER);
+ ASSERT_EQ(time_manager_->last_serial_ts_assigned_, init);
+ ASSERT_EQ(time_manager_->GetSafeTime(), init);
+
+ // Check that 'before' is safe, as is 'init'. 'after' shouldn't be safe.
+ ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(before));
+ ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(init));
+ ASSERT_FALSE(time_manager_->IsTimestampSafeUnlocked(after));
+
+ // Shouldn't be able to assign timestamps.
+ ReplicateMsg message;
+ ASSERT_TRUE(time_manager_->AssignTimestamp(&message).IsIllegalState());
+
+ message.set_timestamp(after.value());
+ // Should accept messages from the leader.
+ ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+ ASSERT_EQ(time_manager_->last_serial_ts_assigned_, after);
+ // .. but shouldn't advance safe time (until we have leader leases).
+ ASSERT_EQ(time_manager_->GetSafeTime(), init);
+
+ // Waiting for safe time at this point should time out since we're not moving it.
+ MonoTime after_small = MonoTime::Now() + MonoDelta::FromMilliseconds(100);
+ ASSERT_TRUE(time_manager_->WaitUntilSafe(after, after_small).IsTimedOut());
+
+ // Create a latch to wait on 'after' to be safe.
+ CountDownLatch* after_latch = WaitForSafeTimeAsync(after);
+
+ // Accepting messages from the leader shouldn't advance safe time.
+ ASSERT_EQ(time_manager_->GetSafeTime(), init);
+ ASSERT_EQ(after_latch->count(), 1);
+
+ // Advancing safe time with a message should unblock the waiter and advance safe time.
+ message.set_timestamp(after.value());
+ time_manager_->AdvanceSafeTimeWithMessage(message);
+ after_latch->Wait();
+ ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+ // Committing an old message shouldn't move safe time back.
+ message.set_timestamp(before.value());
+ time_manager_->AdvanceSafeTimeWithMessage(message);
+ ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+ // Advance 'after' again and test advancing safe time with an explicit timestamp like
+ // the leader sends on (empty) heartbeat messages.
+ after = clock_->Now();
+ after_latch = WaitForSafeTimeAsync(after);
+ time_manager_->AdvanceSafeTime(after);
+ after_latch->Wait();
+ ASSERT_EQ(time_manager_->GetSafeTime(), after);
+
+ // Changing to leader mode should advance safe time.
+ after = clock_->Now();
+ after_latch = WaitForSafeTimeAsync(after);
+ time_manager_->SetLeaderMode();
+ after_latch->Wait();
+ ASSERT_GE(time_manager_->GetSafeTime(), after);
+}
+
+// Tests the TimeManager's functionality in leader mode and the transition to non-leader mode.
+TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
+ Timestamp init = clock_->Now();
+ InitTimeManager(init);
+ time_manager_->SetLeaderMode();
+ Timestamp safe_before = time_manager_->GetSafeTime();
+
+ ReplicateMsg message;
+ // In leader mode we should be able to assign timestamps and the timestamp should be higher
+ // than 'init'.
+ ASSERT_OK(time_manager_->AssignTimestamp(&message));
+ ASSERT_TRUE(message.has_timestamp());
+ Timestamp message_ts(message.timestamp());
+ ASSERT_GT(message_ts, safe_before);
+
+ // In leader mode calling MessageReceivedFromLeader() should cause a CHECK failure.
+ EXPECT_DEATH({
+ time_manager_->MessageReceivedFromLeader(message);
+ }, "Cannot receive messages from a leader in leader mode.");
+
+ // .. as should AdvanceSafeTime()
+ EXPECT_DEATH({
+ time_manager_->AdvanceSafeTime(clock_->Now());
+ }, "Cannot advance safe time by timestamp in leader mode.");
+
+ // Since we haven't appended the message to the queue, safe time should be 'pinned' to
+ // 'safe_before'.
+ ASSERT_EQ(time_manager_->GetSafeTime(), safe_before);
+
+ // When we append the message to the queue safe time should advance again.
+ time_manager_->AdvanceSafeTimeWithMessage(message);
+ ASSERT_GT(time_manager_->GetSafeTime(), message_ts);
+
+ // 'Now' should be safe.
+ Timestamp now = clock_->Now();
+ ASSERT_TRUE(time_manager_->IsTimestampSafeUnlocked(now));
+ ASSERT_GT(time_manager_->GetSafeTime(), now);
+
+ // When changing to non-leader mode a timestamp after the last safe time shouldn't be
+ // safe anymore (even if that time came before the actual change).
+ now = clock_->Now();
+ time_manager_->SetNonLeaderMode();
+ Timestamp safe_after = time_manager_->GetSafeTime();
+ ASSERT_LE(safe_after, now);
+
+ // In leader mode GetSafeTime() usually moves it, but since we changed to non-leader mode
+ // safe time shouldn't move anymore ...
+ ASSERT_EQ(time_manager_->GetSafeTime(), safe_after);
+ now = clock_->Now();
+ MonoTime after_small = MonoTime::Now();
+ after_small.AddDelta(MonoDelta::FromMilliseconds(100));
+ ASSERT_TRUE(time_manager_->WaitUntilSafe(now, after_small).IsTimedOut());
+
+ // ... unless we get a message from the leader.
+ now = clock_->Now();
+ CountDownLatch* after_latch = WaitForSafeTimeAsync(now);
+ message.set_timestamp(now.value());
+ ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+ time_manager_->AdvanceSafeTimeWithMessage(message);
+ after_latch->Wait();
+}
+
+} // namespace consensus
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.cc b/src/kudu/consensus/time_manager.cc
new file mode 100644
index 0000000..eac6ebb
--- /dev/null
+++ b/src/kudu/consensus/time_manager.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <gflags/gflags.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_bool(safe_time_advancement_without_writes, true,
+ "Whether to enable the advancement of \"safe\" time in "
+ "the absense of write operations");
+TAG_FLAG(safe_time_advancement_without_writes, advanced);
+
+namespace kudu {
+namespace consensus {
+
+using server::Clock;
+using strings::Substitute;
+
+typedef std::lock_guard<simple_spinlock> Lock;
+
+ExternalConsistencyMode TimeManager::GetMessageConsistencyMode(const ReplicateMsg& message) {
+ // TODO(dralves): We should have no-ops (?) and config changes be COMMIT_WAIT
+ // transactions. See KUDU-798.
+ // TODO(dralves) Move external consistency mode to ReplicateMsg. This will be useful
+ // for consistent alter table ops.
+ if (PREDICT_TRUE(message.has_write_request())) {
+ return message.write_request().external_consistency_mode();
+ }
+ return CLIENT_PROPAGATED;
+}
+
+TimeManager::TimeManager(scoped_refptr<Clock> clock, Timestamp initial_safe_time)
+ : last_serial_ts_assigned_(initial_safe_time),
+ last_safe_ts_(initial_safe_time),
+ mode_(NON_LEADER),
+ clock_(std::move(clock)) {}
+
+void TimeManager::SetLeaderMode() {
+ Lock l(lock_);
+ mode_ = LEADER;
+ AdvanceSafeTimeAndWakeUpWaitersUnlocked(clock_->Now());
+}
+
+void TimeManager::SetNonLeaderMode() {
+ Lock l(lock_);
+ mode_ = NON_LEADER;
+}
+
+Status TimeManager::AssignTimestamp(ReplicateMsg* message) {
+ Lock l(lock_);
+ if (PREDICT_FALSE(mode_ == NON_LEADER)) {
+ return Status::IllegalState("Cannot assign timestamp. TimeManager is not in Leader mode.");
+ }
+ Timestamp t;
+ switch (GetMessageConsistencyMode(*message)) {
+ case COMMIT_WAIT: t = GetSerialTimestampPlusMaxError(); break;
+ case CLIENT_PROPAGATED: t = GetSerialTimestamp(); break;
+ default: return Status::NotSupported("Unsupported external consistency mode.");
+ }
+ message->set_timestamp(t.value());
+ return Status::OK();
+}
+
+Status TimeManager::MessageReceivedFromLeader(const ReplicateMsg& message) {
+ // NOTE: Currently this method just updates the clock and stores the message's timestamp.
+ // It always returns Status::OK() if the clock returns an OK status on Update().
+ //
+ // When we have leader leases we can trust that the timestamps of messages sent by
+ // any valid leader are safe and we could increase safe time here. However, since
+ // this is not yet the case we will only increase safe time later, when the message is
+ // committed, at the cost of additional delay in moving safe time.
+ //
+ // This greatly reduces the opportunity for non-repeatable reads. On a busy cluster
+ // with a lot of writes (i.e. no empty, "heartbeat" messages) safe time moves only
+ // with committed message timestamps, which are forcibly 'safe'.
+ //
+ // The only opportunity for unrepeatable reads in this setup is if an old leader sends
+ // an (accepted) empty heartbeat message to a follower that immediately afterwards
+ // receives a non-empty message from another higher term leader but with a lower timestamp
+ // than the empty heartbeat.
+ DCHECK(message.has_timestamp());
+ Timestamp t(message.timestamp());
+ RETURN_NOT_OK(clock_->Update(t));
+ {
+ Lock l(lock_);
+ CHECK_EQ(mode_, NON_LEADER) << "Cannot receive messages from a leader in leader mode.";
+ if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) {
+ last_serial_ts_assigned_ = t;
+ }
+ }
+ return Status::OK();
+}
+
+void TimeManager::AdvanceSafeTimeWithMessage(const ReplicateMsg& message) {
+ Lock l(lock_);
+ if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) {
+ AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp(message.timestamp()));
+ }
+}
+
+void TimeManager::AdvanceSafeTime(Timestamp safe_time) {
+ Lock l(lock_);
+ CHECK_EQ(mode_, NON_LEADER) << "Cannot advance safe time by timestamp in leader mode.";;
+ AdvanceSafeTimeAndWakeUpWaitersUnlocked(safe_time);
+}
+
+Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline) {
+ // First wait for the clock to be past 'timestamp'.
+ RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline));
+
+ if (PREDICT_FALSE(MonoTime::Now() > deadline)) {
+ return Status::TimedOut("Timed out waiting for the local clock.");
+ }
+
+ CountDownLatch latch(1);
+ WaitingState waiter;
+ waiter.timestamp = timestamp;
+ waiter.latch = &latch;
+
+ // Register a waiter in waiters_
+ {
+ Lock l(lock_);
+ if (IsTimestampSafeUnlocked(timestamp)) return Status::OK();
+ waiters_.push_back(&waiter);
+ }
+
+ // Wait until we get notified or 'deadline' elapses.
+ if (waiter.latch->WaitUntil(deadline)) return Status::OK();
+
+ // Timed out, clean up.
+ {
+ Lock l(lock_);
+ // Address the case where we were notified after the timeout.
+ if (waiter.latch->count() == 0) return Status::OK();
+
+ waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiter));
+ return Status::TimedOut(Substitute(
+ "Timed out waiting for ts: $0 to be safe (mode: $1). "
+ "Current safe time: $2 Physical time difference: $3",
+ clock_->Stringify(waiter.timestamp),
+ (mode_ == LEADER ? "LEADER" : "NON-LEADER"),
+ clock_->Stringify(last_safe_ts_),
+ (clock_->HasPhysicalComponent() ?
+ clock_->GetPhysicalComponentDifference(timestamp, last_safe_ts_).ToString() :
+ "None (Logical clock)")));
+ }
+}
+
+void TimeManager::AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time) {
+ if (safe_time <= last_safe_ts_) {
+ return;
+ }
+ last_safe_ts_ = safe_time;
+
+ if (PREDICT_FALSE(!waiters_.empty())) {
+ auto iter = waiters_.begin();
+ while (iter != waiters_.end()) {
+ WaitingState* waiter = *iter;
+ if (IsTimestampSafeUnlocked(waiter->timestamp)) {
+ iter = waiters_.erase(iter);
+ waiter->latch->CountDown();
+ continue;
+ }
+ iter++;
+ }
+ }
+}
+
+bool TimeManager::IsTimestampSafeUnlocked(Timestamp timestamp) {
+ return timestamp <= GetSafeTimeUnlocked();
+}
+
+Timestamp TimeManager::GetSafeTime() {
+ Lock l(lock_);
+ return GetSafeTimeUnlocked();
+}
+
+Timestamp TimeManager::GetSafeTimeUnlocked() {
+ switch (mode_) {
+ case LEADER: {
+ // In ASCII form, where 'S' represents a safe timestamp, 'A' represents the last assigned
+ // timestamp, and 'N' represents the current clock value, the internal state can look like
+ // the following diagrams (time moves from left to right):
+ //
+ // a)
+ // SSSSSSSSSSSSSSSSSS N
+ // | \- last_safe_ts_
+ // |
+ // \- last_serial_ts_assigned_
+ // or like:
+ // b)
+ // SSSSSSSSSSSSSSSSSS A N
+ // | \- last_serial_ts_assigned_
+ // |
+ // \- last_safe_ts_
+ //
+ // If the current internal state is a), then we can advance safe time to 'N'. We know the
+ // leader will never assign a new timestamp lower than it.
+ if (PREDICT_TRUE(last_serial_ts_assigned_ <= last_safe_ts_)) {
+ last_safe_ts_ = clock_->Now();
+ return last_safe_ts_;
+ }
+ // If the current state is b), then there might be transaction with a timestamp that is lower
+ // than 'N' in between assignment and being appended to the queue. We can't consider 'N'
+ // safe and thus have to return the last known safe timestamp.
+ // Note that there can be at most one single transaction in this state, because prepare
+ // is single threaded.
+ return last_safe_ts_;
+ }
+ case NON_LEADER:
+ return last_safe_ts_;
+ }
+}
+
+Timestamp TimeManager::GetSerialTimestamp() {
+ last_serial_ts_assigned_ = clock_->Now();
+ return last_serial_ts_assigned_;
+}
+
+Timestamp TimeManager::GetSerialTimestampPlusMaxError() {
+ return clock_->NowLatest();
+}
+
+
+} // namespace consensus
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/bacd9467/src/kudu/consensus/time_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h
new file mode 100644
index 0000000..c8568d5
--- /dev/null
+++ b/src/kudu/consensus/time_manager.h
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/server/clock.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace consensus {
+class ReplicateMsg;
+
+// Manages timestamp assignment to consensus rounds and safe time advancement.
+//
+// Safe time corresponds to a timestamp before which all transactions have been applied to the
+// tablet or are in-flight and is a monotonically increasing timestamp (see note at the end
+// of this class comment).
+//
+// Snapshot scans can use WaitUntilSafe() to wait for a timestamp to be safe. After this method
+// returns an OK status, all the transactions whose timestamps fall before the scan's timestamp
+// will be either committed or in-flight. If the scanner additionally uses the MvccManager to wait
+// until the given timestamp is clean, then the read will be repeatable.
+//
+// In leader mode the TimeManager is responsible for assigning timestamps to transactions
+// and for moving the leader's safe time, which in turn may be sent to replicas on heartbeats
+// moving their safe time. The leader's safe time moves with the clock unless there has been a
+// transaction that was assigned a timestamp that is not yet known by the queue
+// (i.e. AdvanceSafeTimeWithMessage() hasn't been called on the corresponding message).
+// In this case the TimeManager returns the last known safe time.
+//
+// On non-leader mode this class tracks the safe time sent by the leader and updates waiters
+// when it advances.
+//
+// This class's leadership status is meant to be in tune with the queue's as the queue
+// is responsible for broadcasting safe time from a leader (and will eventually be responsible
+// for calculating that leader's lease).
+//
+// See: docs/design-docs/repeatable-reads.md
+//
+// NOTE: Until leader leases are implemented the cluster's safe time can occasionally move back.
+// This does not mean, however, that the timestamp returned by GetSafeTime() can move back.
+// GetSafeTime will still return monotonically increasing timestamps, it's just
+// that, in certain corner cases, the timestamp returned by GetSafeTime() can't be trusted
+// to mean that all future messages will be assigned future timestamps.
+// This anomaly can cause non-repeatable reads in certain conditions.
+//
+// This class is thread safe.
+class TimeManager : public RefCountedThreadSafe<TimeManager> {
+ public:
+
+ // Constructs a TimeManager in non-leader mode.
+ TimeManager(scoped_refptr<server::Clock> clock, Timestamp initial_safe_time);
+
+ // Sets this TimeManager to leader mode.
+ void SetLeaderMode();
+
+ // Sets this TimeManager to non-leader mode.
+ void SetNonLeaderMode();
+
+ // Assigns a timestamp to 'message' according to the message's ExternalConsistencyMode and/or
+ // message type.
+ //
+ // Note that the timestamp in 'message' is not considered safe until the message has
+ // been appended to the queue. Until then safe time is pinned to the last known value.
+ // When the message is appended later on, AdvanceSafeTimeWithMessage() is called and safe time
+ // is advanced.
+ //
+ // Requires Leader mode (non-OK status otherwise).
+ Status AssignTimestamp(ReplicateMsg* message);
+
+ // Updates the internal state based on 'message' received from a leader replica.
+ // Replicas are expected to call this for every message received from a valid leader.
+ //
+ // Returns Status::OK if the message/leader is valid and the clock was correctly updated.
+ //
+ // Requires non-leader mode (CHECK failure if it isn't).
+ Status MessageReceivedFromLeader(const ReplicateMsg& message);
+
+ // Advances safe time based on the timestamp and type of 'message'.
+ //
+ // This only moves safe time if 'message's timestamp is higher than the currently known one.
+ //
+ // Allowed in both leader and non-leader modes.
+ void AdvanceSafeTimeWithMessage(const ReplicateMsg& message);
+
+ // Same as above but for a specific timestamp.
+ //
+ // This only moves safe time if 'safe_time' is higher than the currently known one.
+ //
+ // Requires non-leader mode (CHECK failure if it isn't).
+ void AdvanceSafeTime(Timestamp safe_time);
+
+ // Waits until 'timestamp' is less than or equal to safe time or until 'deadline' has elapsed.
+ //
+ // Returns Status::OK() if it safe time advanced past 'timestamp' before 'deadline'
+ // Returns Status::TimeOut() if deadline elapsed without safe time moving enough.
+ //
+ // TODO(KUDU-1127) make this return another status if safe time is too far back in the past
+ // or hasn't moved in a long time.
+ Status WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline);
+
+ // Returns the current safe time.
+ //
+ // In leader mode returns clock_->Now() or some value close to it.
+ //
+ // In non-leader mode returns the last safe time received from a leader.
+ Timestamp GetSafeTime();
+ private:
+ FRIEND_TEST(TimeManagerTest, TestTimeManagerNonLeaderMode);
+ FRIEND_TEST(TimeManagerTest, TestTimeManagerLeaderMode);
+
+ // Helper to return the external consistency mode of 'message'.
+ static ExternalConsistencyMode GetMessageConsistencyMode(const ReplicateMsg& message);
+
+ // The mode of this TimeManager.
+ enum Mode {
+ LEADER,
+ NON_LEADER
+ };
+
+ // State for waiters.
+ struct WaitingState {
+ // The timestamp the waiter requires be safe.
+ Timestamp timestamp;
+ // Latch that will be count down once 'timestamp' is safe, unblocking the waiter.
+ CountDownLatch* latch;
+ };
+
+ // Returns whether 'timestamp' is safe.
+ // Requires that we've waited for the local clock to move past 'timestamp'.
+ bool IsTimestampSafeUnlocked(Timestamp timestamp);
+
+ // Advances safe time and wakes up any waiters.
+ void AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time);
+
+ // Returns a timestamp that is guaranteed to higher than all other timestamps
+ // that have been assigned by calls to GetSerialTimestamp() (in this or another
+ // replica).
+ Timestamp GetSerialTimestamp();
+
+ // Like the above, but returns a serial timestamp plus the maximum error.
+ // NOTE: GetSerialTimestamp() might still return timestamps that are smaller.
+ Timestamp GetSerialTimestampPlusMaxError();
+
+ // Internal, unlocked implementation of GetSafeTime();
+ Timestamp GetSafeTimeUnlocked();
+
+ // Lock to protect the non-const fields below.
+ mutable simple_spinlock lock_;
+
+ // Vector of waiters to be notified when the safe time advances.
+ mutable std::vector<WaitingState*> waiters_;
+
+ // The last serial timestamp that was assigned.
+ Timestamp last_serial_ts_assigned_;
+
+ // On replicas this is the latest safe time received from the leader, on the leader this is
+ // the last serial timestamp appended to the queue.
+ Timestamp last_safe_ts_;
+
+ // The current mode of the TimeManager.
+ Mode mode_;
+
+ const scoped_refptr<server::Clock> clock_;
+ const std::string local_peer_uuid_;
+};
+
+} // namespace consensus
+} // namespace kudu
[2/2] kudu git commit: Output more info when a snapshot scan fails
due to history gc
Posted by mp...@apache.org.
Output more info when a snapshot scan fails due to history gc
This just adds some niceties to the error message, like
both timestamps and the physical time difference between
them.
Change-Id: Id488fd0366a8e8c5c2a21289a835207ea795ad20
Reviewed-on: http://gerrit.cloudera.org:8080/5374
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/881cc8e9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/881cc8e9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/881cc8e9
Branch: refs/heads/master
Commit: 881cc8e981dbe51ba21fffdb575abd3cf8fe3e5e
Parents: bacd946
Author: David Alves <dr...@apache.org>
Authored: Mon Dec 5 22:50:04 2016 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Dec 6 14:39:40 2016 +0000
----------------------------------------------------------------------
src/kudu/tserver/tablet_service.cc | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/881cc8e9/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 5177a5a..51b5563 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1535,7 +1535,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
// end up with a valid snapshot in that case. It would be more correct to
// initialize the row iterator and then select the latest timestamp
// represented by those open files in that case.
- Timestamp ancient_history_mark;
tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
if (scan_pb.read_mode() == READ_AT_SNAPSHOT &&
history_gc_opts.IsAncientHistory(*snap_timestamp)) {
@@ -1544,9 +1543,15 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
// We have to check after we open the iterator in order to avoid a TOCTOU
// error.
*error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
- return Status::InvalidArgument("Snapshot timestamp is earlier than the ancient history mark",
- "consider increasing the value of the configuration parameter "
- "--tablet_history_max_age_sec");
+ return Status::InvalidArgument(
+ Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider "
+ "increasing the value of the configuration parameter "
+ "--tablet_history_max_age_sec. Snapshot timestamp: $0 "
+ "Ancient History Mark: $1 Physical time difference: $2",
+ server_->clock()->Stringify(*snap_timestamp),
+ server_->clock()->Stringify(history_gc_opts.ancient_history_mark()),
+ server_->clock()->GetPhysicalComponentDifference(
+ *snap_timestamp, history_gc_opts.ancient_history_mark()).ToString()));
}
*has_more_results = iter->HasNext();