You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/11/06 05:46:38 UTC

[2/4] kudu git commit: KUDU-1578. Hybrid clock should ride over brief interruptions in NTP service

KUDU-1578. Hybrid clock should ride over brief interruptions in NTP service

This changes HybridClock so that, if the clock loses synchronization for
a brief period of time, it will continue to use the most recent clock
reading, adjusting it forward based on the local monotonic clock and a
conservative estimate for max error.

In the case that the clock is out of sync for a significantly long time,
the max error will grow large enough to eclipse the 10-second default,
at which point it will still crash as before. But, if NTP is properly
restored within a few minutes, the server should remain operational.

A simple test is included which injects a fake error into the NTP
source. I also ran a kudu-master while briefly disabling NTP and ensured
that the log messages showed up as expected and then went away when I
resumed NTP synchronization.

Change-Id: I0d03a77033070e6bcdfdefc4341981f28a1477a0
Reviewed-on: http://gerrit.cloudera.org:8080/8451
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f510458
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f510458
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f510458

Branch: refs/heads/master
Commit: 7f5104586fa381347c582260df539b8cbb02f08b
Parents: 10ed76c
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 1 19:13:05 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Nov 6 05:26:38 2017 +0000

----------------------------------------------------------------------
 src/kudu/clock/hybrid_clock-test.cc | 38 +++++++++++++++++
 src/kudu/clock/hybrid_clock.cc      | 71 ++++++++++++++++++++++++++++----
 src/kudu/clock/hybrid_clock.h       | 12 ++++--
 src/kudu/clock/system_ntp.cc        | 10 +++++
 4 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock-test.cc b/src/kudu/clock/hybrid_clock-test.cc
index e3d2afd..0524a84 100644
--- a/src/kudu/clock/hybrid_clock-test.cc
+++ b/src/kudu/clock/hybrid_clock-test.cc
@@ -41,6 +41,7 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_bool(inject_adjtimex_errors);
 DECLARE_string(time_source);
 
 namespace kudu {
@@ -309,5 +310,42 @@ TEST_F(HybridClockTest, TestGetPhysicalComponentDifference) {
   ASSERT_EQ(-100, negative_delta.ToMicroseconds());
 }
 
+
+TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
+  Timestamp timestamps[3];
+  uint64_t max_error_usec[3];
+
+  // Get the clock once, with a working NTP.
+  clock_->NowWithError(&timestamps[0], &max_error_usec[0]);
+
+  // Try to read the clock again a second later, but with an error
+  // injected. It should extrapolate from the first read.
+  SleepFor(MonoDelta::FromSeconds(1));
+  FLAGS_inject_adjtimex_errors = true;
+  clock_->NowWithError(&timestamps[1], &max_error_usec[1]);
+
+  // The new clock reading should be a second or longer from the
+  // first one, since SleepFor guarantees sleeping at least as long
+  // as specified.
+  MonoDelta phys_diff = clock_->GetPhysicalComponentDifference(
+      timestamps[1], timestamps[0]);
+  ASSERT_GE(phys_diff.ToSeconds(), 1);
+
+  // The new clock reading should have higher error than the first.
+  // The error should have increased based on the clock skew.
+  int64_t error_diff = max_error_usec[1] - max_error_usec[0];
+  ASSERT_NEAR(error_diff, clock_->time_service()->skew_ppm() * phys_diff.ToSeconds(),
+              10);
+
+  // Now restore the ability to read the system clock, and
+  // read it again.
+  FLAGS_inject_adjtimex_errors = false;
+  clock_->NowWithError(&timestamps[2], &max_error_usec[2]);
+
+  ASSERT_LT(timestamps[0].ToUint64(), timestamps[1].ToUint64());
+  ASSERT_LT(timestamps[1].ToUint64(), timestamps[2].ToUint64());
+}
+
+
 }  // namespace clock
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index 6ee7129..0518afe 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -34,6 +34,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
@@ -170,11 +171,8 @@ void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec) {
 
   uint64_t now_usec;
   uint64_t error_usec;
-  Status s = WalltimeWithError(&now_usec, &error_usec);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(FATAL) << Substitute("Couldn't get the current time: Clock unsynchronized. "
-        "Status: $0", s.ToString());
-  }
+  CHECK_OK_PREPEND(WalltimeWithError(&now_usec, &error_usec),
+                   "Couldn't get the current time");
 
   // If the physical time from the system clock is higher than our last-returned
   // time, we should use the physical timestamp.
@@ -336,12 +334,69 @@ bool HybridClock::IsAfter(Timestamp t) {
 }
 
 kudu::Status HybridClock::WalltimeWithError(uint64_t* now_usec, uint64_t* error_usec) {
-  RETURN_NOT_OK(time_service_->WalltimeWithError(now_usec, error_usec));
+  bool is_extrapolated = false;
+  auto read_time_before = MonoTime::Now();
+  Status s = time_service_->WalltimeWithError(now_usec, error_usec);
+  auto read_time_after = MonoTime::Now();
+
+  if (PREDICT_TRUE(s.ok())) {
+    // We got a good clock read. Remember this in case the clock later becomes
+    // unsynchronized and we need to extrapolate from here.
+    //
+    // Note that the actual act of reading the clock could have taken some time
+    // (eg if we context-switched out) so we need to account for that by adding
+    // some extra error.
+    //
+    //  A         B          C
+    //  |---------|----------|
+    //
+    //  A = read_time_before (monotime)
+    //  B = now_usec (walltime reading)
+    //  C = read_time_after (monotime)
+    //
+    // We don't know whether 'B' was halfway in between 'A' and 'C' or elsewhere.
+    // The max likelihood estimate is that 'B' corresponds to the average of 'A'
+    // and 'C'. Then we need to add in this uncertainty (half of C - A) into any
+    // future clock readings that we extrapolate from this estimate.
+    int64_t read_duration_us = (read_time_after - read_time_before).ToMicroseconds();
+    int64_t read_time_error_us = read_duration_us / 2;
+    MonoTime read_time_max_likelihood = read_time_before +
+        MonoDelta::FromMicroseconds(read_time_error_us);
+
+    std::unique_lock<simple_spinlock> l(last_clock_read_lock_);
+    if (!last_clock_read_time_.Initialized() ||
+        last_clock_read_time_ < read_time_max_likelihood) {
+      last_clock_read_time_ = read_time_max_likelihood;
+      last_clock_read_physical_ = *now_usec;
+      last_clock_read_error_ = *error_usec + read_time_error_us;
+    }
+  } else {
+    // We failed to read the clock. Extrapolate the new time based on our
+    // last successful read.
+    std::unique_lock<simple_spinlock> l(last_clock_read_lock_);
+    if (!last_clock_read_time_.Initialized()) {
+      RETURN_NOT_OK_PREPEND(s, "could not read system time source");
+    }
+    MonoDelta time_since_last_read = read_time_after - last_clock_read_time_;
+    int64_t micros_since_last_read = time_since_last_read.ToMicroseconds();
+    int64_t accum_error_us = (micros_since_last_read * time_service_->skew_ppm()) / 1e6;
+    *now_usec = last_clock_read_physical_ + micros_since_last_read;
+    *error_usec = last_clock_read_error_ + accum_error_us;
+    is_extrapolated = true;
+    l.unlock();
+    // Log after unlocking to minimize the lock hold time.
+    KLOG_EVERY_N_SECS(ERROR, 1) << "Unable to read clock for last "
+                                << time_since_last_read.ToString() << ": " << s.ToString();
+
+  }
+
   // If the clock is synchronized but has max_error beyond max_clock_sync_error_usec
   // we also return a non-ok status.
   if (*error_usec > FLAGS_max_clock_sync_error_usec) {
-    return Status::ServiceUnavailable(Substitute("Error: Clock synchronized but error was"
-        "too high ($0 us).", *error_usec));
+    return Status::ServiceUnavailable(Substitute(
+        "clock error estimate ($0us) too high (clock considered $1 by the kernel)",
+        *error_usec,
+        is_extrapolated ? "unsynchronized" : "synchronized"));
   }
   return kudu::Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/hybrid_clock.h
----------------------------------------------------------------------
diff --git a/src/kudu/clock/hybrid_clock.h b/src/kudu/clock/hybrid_clock.h
index 1e0f83f..2fd41f7 100644
--- a/src/kudu/clock/hybrid_clock.h
+++ b/src/kudu/clock/hybrid_clock.h
@@ -28,13 +28,10 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-
-class MonoDelta;
-class MonoTime;
-
 namespace clock {
 
 // The HybridTime clock.
@@ -185,6 +182,13 @@ class HybridClock : public Clock {
   // the physical clock hasn't advanced beyond the value stored here.
   uint64_t next_timestamp_;
 
+  // The last valid clock reading we got from the time source, along
+  // with the monotime that we took that reading.
+  mutable simple_spinlock last_clock_read_lock_;
+  MonoTime last_clock_read_time_;
+  uint64_t last_clock_read_physical_;
+  uint64_t last_clock_read_error_;
+
   // How many bits to left shift a microseconds clock read. The remainder
   // of the timestamp will be reserved for logical values.
   static const int kBitsToShift;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f510458/src/kudu/clock/system_ntp.cc
----------------------------------------------------------------------
diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc
index d1e8d56..ff2c2a6 100644
--- a/src/kudu/clock/system_ntp.cc
+++ b/src/kudu/clock/system_ntp.cc
@@ -23,12 +23,19 @@
 #include <cerrno>
 #include <ostream>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/port.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/status.h"
 
+DEFINE_bool(inject_adjtimex_errors, false,
+            "If true, will return a fake 'unsynchronized' status from NTP.");
+TAG_FLAG(inject_adjtimex_errors, unsafe);
+
 namespace kudu {
 namespace clock {
 
@@ -42,6 +49,9 @@ Status CallAdjTime(timex* tx) {
   // Set mode to 0 to query the current time.
   tx->modes = 0;
   int rc = ntp_adjtime(tx);
+  if (PREDICT_FALSE(FLAGS_inject_adjtimex_errors)) {
+    rc = TIME_ERROR;
+  }
   switch (rc) {
     case TIME_OK:
       return Status::OK();