You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/11/06 05:46:38 UTC
[2/4] kudu git commit: KUDU-1578. Hybrid clock should ride over brief
interruptions in NTP service
KUDU-1578. Hybrid clock should ride over brief interruptions in NTP service
This changes HybridClock so that, if the clock loses synchronization for
a brief period of time, it will continue to use the most recent clock
reading, adjusting it forward based on the local monotonic clock and a
conservative estimate for max error.
In the case that the clock is out of sync for a significantly long time,
the max error will grow large enough to eclipse the 10-second default,
at which point it will still crash as before. But, if NTP is properly
restored within a few minutes, the server should remain operational.
A simple test is included which injects a fake error into the NTP
source. I also ran a kudu-master while briefly disabling NTP and ensured
that the log messages showed up as expected and then went away when I
resumed NTP synchronization.
Change-Id: I0d03a77033070e6bcdfdefc4341981f28a1477a0
Reviewed-on: http://gerrit.cloudera.org:8080/8451
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f510458
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f510458
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f510458
Branch: refs/heads/master
Commit: 7f5104586fa381347c582260df539b8cbb02f08b
Parents: 10ed76c
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 1 19:13:05 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Nov 6 05:26:38 2017 +0000
----------------------------------------------------------------------
src/kudu/clock/hybrid_clock-test.cc | 38 +++++++++++++++++
src/kudu/clock/hybrid_clock.cc | 71 ++++++++++++++++++++++++++++----
src/kudu/clock/hybrid_clock.h | 12 ++++--
src/kudu/clock/system_ntp.cc | 10 +++++
4 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock-test.cc b/src/kudu/clock/hybrid_clock-test.cc
index e3d2afd..0524a84 100644
--- a/src/kudu/clock/hybrid_clock-test.cc
+++ b/src/kudu/clock/hybrid_clock-test.cc
@@ -41,6 +41,7 @@
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
+DECLARE_bool(inject_adjtimex_errors);
DECLARE_string(time_source);
namespace kudu {
@@ -309,5 +310,42 @@ TEST_F(HybridClockTest, TestGetPhysicalComponentDifference) {
ASSERT_EQ(-100, negative_delta.ToMicroseconds());
}
+
+TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
+ Timestamp timestamps[3];
+ uint64_t max_error_usec[3];
+
+ // Get the clock once, with a working NTP.
+ clock_->NowWithError(×tamps[0], &max_error_usec[0]);
+
+ // Try to read the clock again a second later, but with an error
+ // injected. It should extrapolate from the first read.
+ SleepFor(MonoDelta::FromSeconds(1));
+ FLAGS_inject_adjtimex_errors = true;
+ clock_->NowWithError(×tamps[1], &max_error_usec[1]);
+
+ // The new clock reading should be a second or longer from the
+ // first one, since SleepFor guarantees sleeping at least as long
+ // as specified.
+ MonoDelta phys_diff = clock_->GetPhysicalComponentDifference(
+ timestamps[1], timestamps[0]);
+ ASSERT_GE(phys_diff.ToSeconds(), 1);
+
+ // The new clock reading should have higher error than the first.
+ // The error should have increased based on the clock skew.
+ int64_t error_diff = max_error_usec[1] - max_error_usec[0];
+ ASSERT_NEAR(error_diff, clock_->time_service()->skew_ppm() * phys_diff.ToSeconds(),
+ 10);
+
+ // Now restore the ability to read the system clock, and
+ // read it again.
+ FLAGS_inject_adjtimex_errors = false;
+ clock_->NowWithError(×tamps[2], &max_error_usec[2]);
+
+ ASSERT_LT(timestamps[0].ToUint64(), timestamps[1].ToUint64());
+ ASSERT_LT(timestamps[1].ToUint64(), timestamps[2].ToUint64());
+}
+
+
} // namespace clock
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index 6ee7129..0518afe 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -34,6 +34,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -170,11 +171,8 @@ void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec) {
uint64_t now_usec;
uint64_t error_usec;
- Status s = WalltimeWithError(&now_usec, &error_usec);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
- "Status: $0", s.ToString());
- }
+ CHECK_OK_PREPEND(WalltimeWithError(&now_usec, &error_usec),
+ "Couldn't get the current time");
// If the physical time from the system clock is higher than our last-returned
// time, we should use the physical timestamp.
@@ -336,12 +334,69 @@ bool HybridClock::IsAfter(Timestamp t) {
}
kudu::Status HybridClock::WalltimeWithError(uint64_t* now_usec, uint64_t* error_usec) {
- RETURN_NOT_OK(time_service_->WalltimeWithError(now_usec, error_usec));
+ bool is_extrapolated = false;
+ auto read_time_before = MonoTime::Now();
+ Status s = time_service_->WalltimeWithError(now_usec, error_usec);
+ auto read_time_after = MonoTime::Now();
+
+ if (PREDICT_TRUE(s.ok())) {
+ // We got a good clock read. Remember this in case the clock later becomes
+ // unsynchronized and we need to extrapolate from here.
+ //
+ // Note that the actual act of reading the clock could have taken some time
+ // (eg if we context-switched out) so we need to account for that by adding
+ // some extra error.
+ //
+ // A B C
+ // |---------|----------|
+ //
+ // A = read_time_before (monotime)
+ // B = now_usec (walltime reading)
+ // C = read_time_after (monotime)
+ //
+ // We don't know whether 'B' was halfway in between 'A' and 'C' or elsewhere.
+ // The max likelihood estimate is that 'B' corresponds to the average of 'A'
+ // and 'C'. Then we need to add in this uncertainty (half of C - A) into any
+ // future clock readings that we extrapolate from this estimate.
+ int64_t read_duration_us = (read_time_after - read_time_before).ToMicroseconds();
+ int64_t read_time_error_us = read_duration_us / 2;
+ MonoTime read_time_max_likelihood = read_time_before +
+ MonoDelta::FromMicroseconds(read_time_error_us);
+
+ std::unique_lock<simple_spinlock> l(last_clock_read_lock_);
+ if (!last_clock_read_time_.Initialized() ||
+ last_clock_read_time_ < read_time_max_likelihood) {
+ last_clock_read_time_ = read_time_max_likelihood;
+ last_clock_read_physical_ = *now_usec;
+ last_clock_read_error_ = *error_usec + read_time_error_us;
+ }
+ } else {
+ // We failed to read the clock. Extrapolate the new time based on our
+ // last successful read.
+ std::unique_lock<simple_spinlock> l(last_clock_read_lock_);
+ if (!last_clock_read_time_.Initialized()) {
+ RETURN_NOT_OK_PREPEND(s, "could not read system time source");
+ }
+ MonoDelta time_since_last_read = read_time_after - last_clock_read_time_;
+ int64_t micros_since_last_read = time_since_last_read.ToMicroseconds();
+ int64_t accum_error_us = (micros_since_last_read * time_service_->skew_ppm()) / 1e6;
+ *now_usec = last_clock_read_physical_ + micros_since_last_read;
+ *error_usec = last_clock_read_error_ + accum_error_us;
+ is_extrapolated = true;
+ l.unlock();
+ // Log after unlocking to minimize the lock hold time.
+ KLOG_EVERY_N_SECS(ERROR, 1) << "Unable to read clock for last "
+ << time_since_last_read.ToString() << ": " << s.ToString();
+
+ }
+
// If the clock is synchronized but has max_error beyond max_clock_sync_error_usec
// we also return a non-ok status.
if (*error_usec > FLAGS_max_clock_sync_error_usec) {
- return Status::ServiceUnavailable(Substitute("Error: Clock synchronized but error was"
- "too high ($0 us).", *error_usec));
+ return Status::ServiceUnavailable(Substitute(
+ "clock error estimate ($0us) too high (clock considered $1 by the kernel)",
+ *error_usec,
+ is_extrapolated ? "unsynchronized" : "synchronized"));
}
return kudu::Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock.h
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock.h b/src/kudu/clock/hybrid_clock.h
index 1e0f83f..2fd41f7 100644
--- a/src/kudu/clock/hybrid_clock.h
+++ b/src/kudu/clock/hybrid_clock.h
@@ -28,13 +28,10 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
-
-class MonoDelta;
-class MonoTime;
-
namespace clock {
// The HybridTime clock.
@@ -185,6 +182,13 @@ class HybridClock : public Clock {
// the physical clock hasn't advanced beyond the value stored here.
uint64_t next_timestamp_;
+ // The last valid clock reading we got from the time source, along
+ // with the monotime that we took that reading.
+ mutable simple_spinlock last_clock_read_lock_;
+ MonoTime last_clock_read_time_;
+ uint64_t last_clock_read_physical_;
+ uint64_t last_clock_read_error_;
+
// How many bits to left shift a microseconds clock read. The remainder
// of the timestamp will be reserved for logical values.
static const int kBitsToShift;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/system_ntp.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc
index d1e8d56..ff2c2a6 100644
--- a/src/kudu/clock/system_ntp.cc
+++ b/src/kudu/clock/system_ntp.cc
@@ -23,12 +23,19 @@
#include <cerrno>
#include <ostream>
+#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/gutil/port.h"
#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/status.h"
+DEFINE_bool(inject_adjtimex_errors, false,
+ "If true, will return a fake 'unsynchronized' status from NTP.");
+TAG_FLAG(inject_adjtimex_errors, unsafe);
+
namespace kudu {
namespace clock {
@@ -42,6 +49,9 @@ Status CallAdjTime(timex* tx) {
// Set mode to 0 to query the current time.
tx->modes = 0;
int rc = ntp_adjtime(tx);
+ if (PREDICT_FALSE(FLAGS_inject_adjtimex_errors)) {
+ rc = TIME_ERROR;
+ }
switch (rc) {
case TIME_OK:
return Status::OK();