You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/04/03 16:34:57 UTC

[kudu] branch master updated: [clock] add sanity check to detect wall clock jumps

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 555854178 [clock] add sanity check to detect wall clock jumps
555854178 is described below

commit 555854178b9b498701619f4bb0dbbbbeab8e69e7
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Feb 1 08:33:51 2023 -0800

    [clock] add sanity check to detect wall clock jumps
    
    There was a case when a timestamp read from system/local clock using
    the ntp_adjtime() call jumped 40+ years ahead when running kudu-tserver
    on an Azure VM, while ntp_adjtime() didn't report an error on clock
    being unsynchronized or any other error at all. That came along with
    a huge number of kernel messages, and other software (such as the SASL
    library used by SSSD) detected the strange jump in the local clock
    as well.  My multiple attempts to reproduce the issue on a real
    hardware node, Dockerized environment run at a real hardware server
    in a datacenter, and GCE & EC2 VMs were not successful.
    
    This patch adds a sanity check to detect such strange jumps in wall
    clock readings.  The idea is to rely on the readings from the
    CLOCK_MONOTONIC_RAW clock captured along with the wall clock readings.
    A jump should manifest itself in a big difference between the wall clock
    delta and the corresponding CLOCK_MONOTONIC_RAW delta.  If such a
    condition is detected, then HybridClock::NowWithErrorUnlocked() dumps
    diagnostic information about clock NTP synchronisation status and
    returns Status::ServiceUnavailable() with appropriate error message.
    
    This patch also adds a unit test for the newly added functionality.
    
    As a part of this changelist, the following new flags are introduced:
      * --wall_clock_jump_detection
          This is to control the newly introduced sanity check for readings
          of the wall clock.  Acceptable values are 'auto', 'enabled,
          and 'disabled'.  It is set to 'auto' by default, meaning the
          sanity check for timestamps is enabled if the process detects
          that it's running on a VM in Azure cloud.
      * --wall_clock_jump_threshold_sec
          This is to control the threshold (in seconds) for the difference
          in deltas of the wall clock's and CLOCK_MONOTONIC_RAW clock's
          readings.  It is set to 900 (15 minutes) by default.
    
    The reasoning behind having --wall_clock_jump_detection=auto by
    default is to skip an extra check at the majority of nodes out there
    since NTP-synchronized system clock isn't supposed to jump that much
    at all.  However, since the issue has been observed only at VMs in Azure
    cloud, it's enabled automatically at Azure nodes to detect the issue.
    If the clock jump went unnoticed, the timestamp would be persisted with
    an operation in the WAL and propagated to other replicas.  That could
    lead to crashes during tablet bootstrapping and would require manual
    intervention to remove the orphaned operations with out-of-whack
    timestamps from the WALs of affected tablet replicas.
    
    To assess the performance hit induced by the jump clock detection,
    I also added a parameterized HybridClockJumpProtectionTest.BasicPerf
    scenario to compare how fast HybridClock::NowWithError() runs
    with and without the newly introduced sanity check.
    
    I found that with sanity check enabled the method runs 1.12 times slower
    than without the check.  The details are below.
    
    To collect stats on the performance of the HybridClock::NowWithError()
    method, I used the command below, running it 10 times for RELEASE and
    DEBUG builds to run the new test scenario.  I recorded the timings
    output by the test scenarios and computed the average time taken to
    invoke HybridClock::NowWithError() 1000000 times.
    
      KUDU_ALLOW_SLOW_TESTS=1 ./bin/hybrid_clock-test --gtest_filter='*Perf*'
    
      -------------------------------------------------------------------
    
      RELEASE build:
    
      without clock jump detection:
        1000000 iterations in 158.553514 ms
        1000000 iterations in 159.200062 ms
        1000000 iterations in 157.840572 ms
        1000000 iterations in 159.079716 ms
        1000000 iterations in 161.235647 ms
    
        average: 159.181902 ms
    
      with clock jump detection:
        1000000 iterations in 178.415508 ms
        1000000 iterations in 178.102677 ms
        1000000 iterations in 177.465617 ms
        1000000 iterations in 180.552167 ms
        1000000 iterations in 179.644688 ms
    
        average: 178.836131 ms
    
      -------------------------------------------------------------------
    
      DEBUG build:
    
      without clock jump detection:
        1000000 iterations in 592.051281 ms
        1000000 iterations in 591.087500 ms
        1000000 iterations in 597.142767 ms
        1000000 iterations in 591.066335 ms
        1000000 iterations in 590.717687 ms
    
        average: 592.413114 ms
    
      with clock jump detection:
        1000000 iterations in 682.691815 ms
        1000000 iterations in 682.087873 ms
        1000000 iterations in 685.602277 ms
        1000000 iterations in 683.057099 ms
        1000000 iterations in 682.095996 ms
    
        average: 683.107012 ms
    
      -------------------------------------------------------------------
    
    RELEASE, running under perf stat without clock jump detection:
    
     Performance counter stats for './bin/hybrid_clock-test --gtest_filter=Perf/HybridClockJumpProtectionTest.BasicPerf/0' (10 runs):
    
            171.499485 task-clock                #    0.999 CPUs utilized            ( +-  0.18% )
                     3 context-switches          #    0.016 K/sec                    ( +-  4.76% )
                     0 cpu-migrations            #    0.000 K/sec
                 2,829 page-faults               #    0.016 M/sec                    ( +-  1.92% )
           562,222,108 cycles                    #    3.278 GHz                      ( +-  0.18% )
           698,278,773 instructions              #    1.24  insns per cycle          ( +-  0.09% )
           122,793,006 branches                  #  715.996 M/sec                    ( +-  0.11% )
               319,525 branch-misses             #    0.26% of all branches          ( +-  0.16% )
    
           0.171751822 seconds time elapsed                                          ( +-  0.18% )
    
      -------------------------------------------------------------------
    
    RELEASE, running under perf stat with clock jump detection:
    
     Performance counter stats for './bin/hybrid_clock-test --gtest_filter=Perf/HybridClockJumpProtectionTest.BasicPerf/1' (10 runs):
    
            193.189743 task-clock                #    0.999 CPUs utilized            ( +-  0.84% )
                     3 context-switches          #    0.015 K/sec                    ( +-  8.05% )
                     0 cpu-migrations            #    0.000 K/sec
                 2,741 page-faults               #    0.014 M/sec                    ( +-  2.35% )
           627,522,941 cycles                    #    3.248 GHz                      ( +-  0.22% )
           797,617,361 instructions              #    1.27  insns per cycle          ( +-  0.13% )
           138,636,629 branches                  #  717.619 M/sec                    ( +-  0.13% )
               339,705 branch-misses             #    0.25% of all branches          ( +-  0.31% )
    
           0.193459251 seconds time elapsed                                          ( +-  0.84% )
    
      -------------------------------------------------------------------
    
    Change-Id: I630783653717d975a9b2ad668e8bd47b7796d275
    Reviewed-on: http://gerrit.cloudera.org:8080/19473
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Yingchun Lai <la...@apache.org>
---
 src/kudu/clock/hybrid_clock-test.cc | 104 ++++++++++++++++++++++-
 src/kudu/clock/hybrid_clock.cc      |  76 +++++++++++++----
 src/kudu/clock/hybrid_clock.h       |  43 ++++++++--
 src/kudu/clock/system_ntp.h         |  15 ++--
 src/kudu/server/server_base.cc      | 165 +++++++++++++++++++++++++++++++++---
 src/kudu/server/server_base.h       |  14 +++
 6 files changed, 377 insertions(+), 40 deletions(-)

diff --git a/src/kudu/clock/hybrid_clock-test.cc b/src/kudu/clock/hybrid_clock-test.cc
index 4c9ec5ee2..f711c56c2 100644
--- a/src/kudu/clock/hybrid_clock-test.cc
+++ b/src/kudu/clock/hybrid_clock-test.cc
@@ -35,6 +35,7 @@
 #include "kudu/clock/time_service.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/gutil/casts.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -52,6 +53,7 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -69,6 +71,7 @@ using kudu::cluster::ExternalMiniClusterOptions;
 using std::string;
 using std::thread;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace clock {
@@ -394,8 +397,14 @@ TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
 TEST_F(HybridClockTest, SlowClockInitialisation) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
-  const vector<string> kExtraFlags =
-      { "--hybrid_clock_inject_init_delay_ms=100" };
+  const vector<string> kExtraFlags = {
+    // Inject delay to emulate slow initialization of the hybrid clock.
+    "--hybrid_clock_inject_init_delay_ms=100",
+
+    // Switching from the default "auto" to "enabled" for faster startup times:
+    // this test scenario is sensitive to start up timing.
+    "--wall_clock_jump_detection=enabled",
+  };
 
   ExternalMiniClusterOptions opts;
   opts.num_masters = 3;
@@ -455,7 +464,7 @@ TEST_F(HybridClockTest, SlowClockInitialisation) {
   EasyCurl c;
   for (const auto& addr : addresses) {
     faststring buf;
-    ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/metrics", addr), &buf));
+    ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics", addr), &buf));
     const auto str = buf.ToString();
     ASSERT_STR_CONTAINS(str, "builtin_ntp_error");
     ASSERT_STR_CONTAINS(str, "builtin_ntp_local_clock_delta");
@@ -557,6 +566,57 @@ TEST_F(HybridClockTest, AutoTimeSourceNoDedicatedNtpServer) {
   }
 }
 
+// Imitate the clock jumping well other the threshold.
+TEST_F(HybridClockTest, ClockJumpDetection) {
+  static constexpr uint64_t kThresholdSeconds = 100;
+  FLAGS_time_source = "mock";
+  HybridClock clock(metric_entity_, kThresholdSeconds * 1000 * 1000);
+
+  ASSERT_OK(clock.Init());
+
+  const auto time_setter = [&](MonoDelta delta) {
+    uint64_t now = HybridClock::GetPhysicalValueMicros(clock.Now());
+    uint64_t to_update = now + delta.ToMicroseconds();
+    auto* hybrid_clock = down_cast<HybridClock*>(&clock);
+    auto* mock_ntp = down_cast<clock::MockNtp*>(hybrid_clock->time_service());
+    mock_ntp->SetMockClockWallTimeForTests(to_update);
+  };
+
+  // Move the 'now' timestamp of the mock clock to make sure the proper 'if'
+  // branch is taken in HybridClock::NowWithErrorUnlocked().
+  time_setter(MonoDelta::FromMicroseconds(1));
+  {
+    Timestamp ts;
+    uint64_t max_error_usec;
+    ASSERT_OK(clock.NowWithError(&ts, &max_error_usec));
+    ASSERT_EQ(1, HybridClock::GetPhysicalValueMicros(ts));
+  }
+
+  // Get closer to the threshold. It should still work since the threshold
+  // isn't crossed yet.
+  time_setter(MonoDelta::FromSeconds(kThresholdSeconds));
+  {
+    Timestamp ts;
+    uint64_t max_error_usec;
+    ASSERT_OK(clock.NowWithError(&ts, &max_error_usec));
+    ASSERT_EQ(kThresholdSeconds * 1000 * 1000 + 1,
+              HybridClock::GetPhysicalValueMicros(ts));
+  }
+
+  // Now make the clock jumping well over the threshold: with previous jump
+  // it should be about two times over the threshold.
+  time_setter(MonoDelta::FromSeconds(2 * kThresholdSeconds));
+  {
+    Timestamp ts;
+    uint64_t max_error_usec;
+    const auto s = clock.NowWithError(&ts, &max_error_usec);
+    ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+    ASSERT_STR_MATCHES(s.ToString(),
+        "wall \\(200000000 us\\) and monotonic \\([0-9]+ us\\) "
+        "clock deltas diverged too much \\(threshold 100000000 us\\)");
+  }
+}
+
 #if defined(KUDU_HAS_SYSTEM_TIME_SOURCE)
 TEST_F(HybridClockTest, TestNtpDiagnostics) {
   FLAGS_time_source = "system";
@@ -593,5 +653,43 @@ TEST_F(HybridClockTest, TestNtpDiagnostics) {
 }
 #endif // #if defined(KUDU_HAS_SYSTEM_TIME_SOURCE) ...
 
+// The boolean parameter is to specify whether the wall clock protection is
+// enabled or not ('true' -- enabled, 'false' -- disabled).
+class HybridClockJumpProtectionTest : public ClockTest,
+                            public ::testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_SUITE_P(Perf, HybridClockJumpProtectionTest, ::testing::Bool());
+
+// This is a scenario for basic peformance assessment of the
+// HybridClock::NowWithError() method with clock jump check enabled/disabled.
+TEST_P(HybridClockJumpProtectionTest, BasicPerf) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr const int kNumIterations = 1000000;
+  FLAGS_time_source = "system_unsync";
+  const auto enable_protection = GetParam();
+  HybridClock clock(metric_entity_, enable_protection ? 10000000UL : 0UL);
+  ASSERT_OK(clock.Init());
+
+  Stopwatch sw;
+  sw.start();
+  Status res_status;
+  for (auto i = 0; i < kNumIterations; ++i) {
+    Timestamp timestamp;
+    uint64_t max_error_usec;
+    auto s = clock.NowWithError(&timestamp, &max_error_usec);
+    if (PREDICT_FALSE(!s.ok())) {
+      res_status = s;
+      break;
+    }
+  }
+  sw.stop();
+
+  ASSERT_OK(res_status);
+
+  LOG(INFO) << Substitute("$0 iterations in $1 ms",
+                          kNumIterations, sw.elapsed().wall_millis());
+}
+
 }  // namespace clock
 }  // namespace kudu
diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index 8c64c23be..eab378c55 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -18,6 +18,7 @@
 #include "kudu/clock/hybrid_clock.h"
 
 #include <algorithm>
+#include <cstdlib>
 #include <functional>
 #include <limits>
 #include <memory>
@@ -36,6 +37,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
 #include "kudu/util/cloud/instance_detector.h"
 #include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/debug/trace_event.h"
@@ -151,6 +153,8 @@ TAG_FLAG(ntp_initial_sync_wait_secs, evolving);
 
 DECLARE_bool(unlock_unsafe_flags);
 
+namespace {
+
 // This group flag validator is a guardrail to help using proper time source
 // in production.
 //
@@ -171,6 +175,8 @@ bool ValidateTimeSource() {
 }
 GROUP_FLAG_VALIDATOR(time_source_guardrail, ValidateTimeSource);
 
+} // anonymous namespace
+
 METRIC_DEFINE_gauge_bool(server, hybrid_clock_extrapolating,
                          "Hybrid Clock Is Being Extrapolated",
                          kudu::MetricUnit::kState,
@@ -197,7 +203,7 @@ METRIC_DEFINE_histogram(server, hybrid_clock_max_errors,
                         kudu::MetricUnit::kMicroseconds,
                         "The statistics on the maximum error of the underlying "
                         "clock",
-                         kudu::MetricLevel::kDebug,
+                        kudu::MetricLevel::kDebug,
                         10000000, 1);
 
 METRIC_DEFINE_histogram(server, hybrid_clock_extrapolation_intervals,
@@ -230,8 +236,15 @@ Status CheckDeadlineNotWithinMicros(const MonoTime& deadline, int64_t wait_for_u
 
 }  // anonymous namespace
 
-HybridClock::HybridClock(const scoped_refptr<MetricEntity>& metric_entity)
-    : next_timestamp_(0),
+HybridClock::HybridClock(const scoped_refptr<MetricEntity>& metric_entity,
+                         uint64_t wall_clock_jump_threshold_usec,
+                         std::unique_ptr<InstanceMetadata> im)
+    : instance_metadata_(std::move(im)),
+      is_wall_clock_jump_check_enabled_(wall_clock_jump_threshold_usec > 0),
+      wall_clock_jump_threshold_usec_(wall_clock_jump_threshold_usec),
+      next_timestamp_(0),
+      prev_mono_time_usec_(0),
+      prev_wall_time_usec_(0),
       state_(kNotInitialized),
       metric_entity_(metric_entity) {
   DCHECK(metric_entity);
@@ -245,7 +258,8 @@ HybridClock::HybridClock(const scoped_refptr<MetricEntity>& metric_entity)
 
 Status HybridClock::Init() {
   TimeSource time_source = TimeSource::UNKNOWN;
-  RETURN_NOT_OK(SelectTimeSource(FLAGS_time_source, &time_source));
+  RETURN_NOT_OK(SelectTimeSource(
+      FLAGS_time_source, &time_source, instance_metadata_.get()));
   LOG(INFO) << Substitute("auto-selected time source: $0",
                           TimeSourceToString(time_source));
   return InitWithTimeSource(time_source);
@@ -260,7 +274,7 @@ Timestamp HybridClock::Now() {
 
 Timestamp HybridClock::NowLatest() {
   Timestamp now;
-  uint64_t error;
+  uint64_t error = 0;
   NowWithErrorOrDie(&now, &error);
 
   uint64_t now_latest = GetPhysicalValueMicros(now) + error;
@@ -323,7 +337,7 @@ Status HybridClock::WaitUntilAfter(const Timestamp& then,
                                    const MonoTime& deadline) {
   TRACE_EVENT0("clock", "HybridClock::WaitUntilAfter");
   Timestamp now;
-  uint64_t error;
+  uint64_t error = 0;
   RETURN_NOT_OK(NowWithError(&now, &error));
 
   // "unshift" the timestamps so that we can measure actual time
@@ -394,8 +408,8 @@ bool HybridClock::IsAfter(Timestamp t) {
   return t.value() < now.value();
 }
 
-Status HybridClock::NowWithErrorUnlocked(Timestamp *timestamp,
-                                         uint64_t *max_error_usec) {
+Status HybridClock::NowWithErrorUnlocked(Timestamp* timestamp,
+                                         uint64_t* max_error_usec) {
   DCHECK(lock_.is_locked());
   DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init() first.";
 
@@ -405,8 +419,33 @@ Status HybridClock::NowWithErrorUnlocked(Timestamp *timestamp,
 
   // If the physical time from the system clock is higher than our last-returned
   // time, we should use the physical timestamp.
-  uint64_t candidate_phys_timestamp = now_usec << kBitsToShift;
+  const uint64_t candidate_phys_timestamp = now_usec << kBitsToShift;
   if (PREDICT_TRUE(candidate_phys_timestamp > next_timestamp_)) {
+    // If enabled, perform an extra sanity check to make sure wall clock time
+    // hasn't jumped too far compared with monotonic clock time.
+    if (is_wall_clock_jump_check_enabled_) {
+      const int64_t now_mono_time_usec = GetMonoTimeMicrosRaw();
+      if (PREDICT_TRUE(prev_mono_time_usec_ != 0)) {
+        DCHECK_GE(now_mono_time_usec, prev_mono_time_usec_);
+        const int64_t mono_delta_usec = now_mono_time_usec - prev_mono_time_usec_;
+        const int64_t wall_delta_usec =
+            static_cast<int64_t>(now_usec) - prev_wall_time_usec_;
+        // Check if the wall clock timestamp has jumped too far compared
+        // with CLOCK_MONOTONIC_RAW timestamp read almost at the same moment.
+        if (PREDICT_FALSE(abs(wall_delta_usec - mono_delta_usec) >
+                          wall_clock_jump_threshold_usec_)) {
+          const auto msg = Substitute(
+              "wall ($0 us) and monotonic ($1 us) clock deltas diverged too much "
+              "(threshold $2 us)",
+              wall_delta_usec, mono_delta_usec, wall_clock_jump_threshold_usec_);
+          time_service_->DumpDiagnostics(/*log=*/nullptr);
+          return Status::ServiceUnavailable(msg);
+        }
+      }
+      prev_mono_time_usec_ = now_mono_time_usec;
+      prev_wall_time_usec_ = static_cast<int64_t>(now_usec);
+    }
+
     next_timestamp_ = candidate_phys_timestamp;
     *timestamp = Timestamp(next_timestamp_++);
     *max_error_usec = error_usec;
@@ -454,14 +493,20 @@ void HybridClock::NowWithErrorOrDie(Timestamp* timestamp,
 }
 
 Status HybridClock::SelectTimeSource(const string& time_source_str,
-                                     TimeSource* time_source) {
+                                     TimeSource* time_source,
+                                     const InstanceMetadata* instance_metadata) {
   constexpr const char* const BUILTIN_NTP_SERVERS = "builtin_ntp_servers";
 
   TimeSource result_time_source = TimeSource::UNKNOWN;
   if (iequals(time_source_str, TIME_SOURCE_AUTO)) {
-    InstanceDetector detector;
-    unique_ptr<InstanceMetadata> md;
-    const auto s = detector.Detect(&md);
+    auto s = Status::OK();
+    const auto* md = instance_metadata;
+    unique_ptr<InstanceMetadata> im;
+    if (!md) {
+      InstanceDetector detector;
+      s = detector.Detect(&im);
+      md = im.get();
+    }
     string ntp_server;
     if (s.ok() && md->GetNtpServer(&ntp_server).ok()) {
       // Select the built-in NTP client. If the auto-configuration of the
@@ -719,9 +764,8 @@ uint64_t HybridClock::NowForMetrics() {
 // Used to get the current error, for metrics.
 uint64_t HybridClock::ErrorForMetrics() {
   Timestamp now_unused;
-  uint64_t error;
-  auto s = NowWithError(&now_unused, &error);
-  if (PREDICT_FALSE(!s.ok())) {
+  uint64_t error = 0;
+  if (auto s = NowWithError(&now_unused, &error); PREDICT_FALSE(!s.ok())) {
     return std::numeric_limits<uint64_t>::max();
   }
   return error;
diff --git a/src/kudu/clock/hybrid_clock.h b/src/kudu/clock/hybrid_clock.h
index 42c6100a4..fc7650974 100644
--- a/src/kudu/clock/hybrid_clock.h
+++ b/src/kudu/clock/hybrid_clock.h
@@ -28,6 +28,7 @@
 #include "kudu/common/timestamp.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -43,8 +44,15 @@ namespace clock {
 class HybridClock : public Clock {
  public:
   // Create an instance, registering HybridClock's metrics with the specified
-  // metric entity.
-  explicit HybridClock(const scoped_refptr<MetricEntity>& metric_entity);
+  // metric entity. If 'wall_clock_jump_jump_threshold_usec' is greater than 0,
+  // enable the logic to detect sudden jumps of the wall clock with the
+  // corresponding threshold in microseconds. When InstanceMetadata is already
+  // available from a prior run of InstanceDetector, pass it with the 'im'
+  // parameter for deducing the effective time source during Init() instead of
+  // running instance detection one more time there.
+  explicit HybridClock(const scoped_refptr<MetricEntity>& metric_entity,
+                       uint64_t wall_clock_jump_threshold_usec = 0,
+                       std::unique_ptr<cloud::InstanceMetadata> im = {});
 
   // Should be called only once.
   Status Init() override;
@@ -203,8 +211,12 @@ class HybridClock : public Clock {
   // determines particular time service to use. If the 'builtin' time source is
   // selected, the --builtin_ntp_servers flag's value is used to build the set
   // of reference servers for the built-in NTP client.
-  static Status SelectTimeSource(const std::string& time_source_str,
-                                 TimeSource* time_source);
+  // If non-null, the optional 'instance_metadata' is used for selecting the
+  // effective time source.
+  static Status SelectTimeSource(
+      const std::string& time_source_str,
+      TimeSource* time_source,
+      const cloud::InstanceMetadata* instance_metadata = nullptr);
 
   // Initialize hybrid clock with the specified time source.
   // If 'time_source' is TimeSource::BUILTIN_NTP_SYNC, the set of reference
@@ -230,11 +242,25 @@ class HybridClock : public Clock {
   // Used to get the current error, for metrics.
   uint64_t ErrorForMetrics();
 
+  // If non-null, this instance metadata is used to detect effective time source
+  // during Init().
+  std::unique_ptr<cloud::InstanceMetadata> instance_metadata_;
+
+  // Whether to run a sanity check on the wall clock readings used for the
+  // physical part of the hybrid timestamp.
+  const bool is_wall_clock_jump_check_enabled_;
+
+  // The threshold for the difference of deltas in the wall clock's and the
+  // CLOCK_MONOTONIC_RAW clock's timestamps captured almost at the same time.
+  // The --wall_clock_jump_threshold_sec flag's value is used as the source
+  // for this constant.
+  const uint64_t wall_clock_jump_threshold_usec_;
+
   // Used to fetch the current time and error bound from the system or NTP
   // service.
   std::unique_ptr<clock::TimeService> time_service_;
 
-  // Guards access to 'state_' and 'next_timestamp_'.
+  // Guards access to 'state_', 'next_timestamp_', 'prev_{mono,wall}_time_usec_'.
   simple_spinlock lock_;
 
   // The next timestamp to be generated from this clock, assuming that
@@ -242,6 +268,13 @@ class HybridClock : public Clock {
   // Protected by 'lock_'.
   uint64_t next_timestamp_;
 
+  // Prior value of the CLOCK_MONOTONIC_RAW captured almost the same moment
+  // when next_timestamp_'s physical part was captured.
+  int64_t prev_mono_time_usec_;
+
+  // Prior value of the physical part of 'next_timestamp_'.
+  int64_t prev_wall_time_usec_;
+
   // The last valid clock reading we got from the time source, along
   // with the monotime that we took that reading. The 'is_extrapolating' field
   // tracks whether extrapolated or real readings of the underlying clock are
diff --git a/src/kudu/clock/system_ntp.h b/src/kudu/clock/system_ntp.h
index b79314d12..a6a2095c9 100644
--- a/src/kudu/clock/system_ntp.h
+++ b/src/kudu/clock/system_ntp.h
@@ -29,12 +29,17 @@
 namespace kudu {
 namespace clock {
 
-// TimeService implementation which uses the 'ntp_adjtime' call (corresponding to the
-// 'adjtimex' syscall) to consult the Linux kernel for the current time
-// and error bound.
+// The SystemNtp is a TimeService implementation which uses the ntp_adjtime()
+// call [1] (similar to ntp_gettime() call [2] from the NTP kernel API, both
+// based on the adjtimex() syscall on Linux) to consult the Linux kernel
+// for the current time and error bound.
 //
-// This implementation relies on the ntpd service running on the local host
-// to keep the kernel's timekeeping up to date and in sync.
+// This implementation relies on the NTP service running on the local host
+// (implemented, for example, by chronyd, ntpd, etc.) to keep the system clock
+// synchronized with reference NTP time servers.
+//
+// [1] https://man7.org/linux/man-pages/man3/ntp_adjtime.3.html
+// [2] https://man7.org/linux/man-pages/man3/ntp_gettime.3.html
 class SystemNtp : public TimeService {
  public:
   explicit SystemNtp(const scoped_refptr<MetricEntity>& metric_entity);
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 1fac55e71..a20cbfb7e 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -69,6 +69,8 @@
 #include "kudu/server/tracing_path_handlers.h"
 #include "kudu/server/webserver.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/cloud/instance_detector.h"
+#include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/file_cache.h"
@@ -278,6 +280,34 @@ DEFINE_string(jwks_discovery_endpoint_base, "",
               "are taken from received JWTs to get the appropriate Discovery Endpoint.");
 TAG_FLAG(jwks_discovery_endpoint_base, experimental);
 
+// The targeted use-case for the wall clock jump detection is spotting sudden
+// swings of the local clock while it is still reported to be synchronized with
+// reference NTP clock.
+DEFINE_string(wall_clock_jump_detection, "auto",
+              "Whether to run a sanity check on wall clock timestamps using "
+              "the readings of the CLOCK_MONOTONIC_RAW clock as the reference. "
+              "Acceptable values for this flag are \"auto\", \"enabled\", and "
+              "\"disabled\". \"auto\" enables the sanity check in environments "
+              "known to be susceptible to such clock jumps (e.g., Azure VMs); "
+              "\"enabled\" unconditionally enables the check; \"disabled\" "
+              "unconditionally disables the check. The threshold for the "
+              "difference between deltas of consecutive timestamps read from "
+              "wall and CLOCK_MONOTONIC_RAW clocks is controlled by the "
+              "--wall_clock_jump_threshold_sec flag.");
+TAG_FLAG(wall_clock_jump_detection, experimental);
+
+// The idea behind having 900 seconds as the default threshold is to have the
+// bar set quite high, but still under 1000 seconds which is the default time
+// delta threshold for ntpd unless '-g' option is added or 'tinker panic 0'
+// or similar directive is present in ntp.conf (ntpd would exit without trying
+// to adjust time if it detects the difference between the reference time and
+// the local clock time to be greater than 1000 seconds, see 'man ntpd').
+DEFINE_uint32(wall_clock_jump_threshold_sec, 15 * 60,
+              "Maximum allowed divergence between the wall and monotonic "
+              "clocks; effective only when the clock jump protection "
+              "is enabled");
+TAG_FLAG(wall_clock_jump_threshold_sec, experimental);
+
 DECLARE_bool(use_hybrid_clock);
 DECLARE_int32(dns_resolver_max_threads_num);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
@@ -289,6 +319,7 @@ DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_string(log_filename);
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
+DECLARE_string(time_source);
 
 METRIC_DECLARE_gauge_size(merged_entities_count_of_server);
 METRIC_DEFINE_gauge_int64(server, uptime,
@@ -317,6 +348,9 @@ METRIC_DEFINE_gauge_int64(server, memory_usage,
                           kudu::MetricLevel::kInfo);
 #endif // #ifdef TCMALLOC_ENABLED
 
+using kudu::cloud::CloudType;
+using kudu::cloud::InstanceDetector;
+using kudu::cloud::InstanceMetadata;
 using kudu::security::RpcAuthentication;
 using kudu::security::RpcEncryption;
 using std::ostringstream;
@@ -340,6 +374,58 @@ namespace server {
 
 namespace {
 
+enum class TriState {
+  AUTO,
+  ENABLED,
+  DISABLED,
+};
+
+// This is a helper function to parse a flag that has three possible values:
+// "auto", "enabled", "disabled".  That directly maps into the TriState enum.
+Status ParseTriStateFlag(const string& name,
+                         const string& value,
+                         TriState* result = nullptr) {
+  if (iequals(value, "auto")) {
+    if (result) {
+      *result = TriState::AUTO;
+    }
+    return Status::OK();
+  }
+  if (iequals(value, "enabled")) {
+    if (result) {
+      *result = TriState::ENABLED;
+    }
+    return Status::OK();
+  }
+  if (iequals(value, "disabled")) {
+    if (result) {
+      *result = TriState::DISABLED;
+    }
+    return Status::OK();
+  }
+  return Status::InvalidArgument(
+      Substitute("$0: invalid value for flag --$1", value, name));
+}
+
+bool ValidateWallClockJumpDetection(const char* name, const string& value) {
+  const auto s = ParseTriStateFlag(name, value);
+  if (s.ok()) {
+    return true;
+  }
+  LOG(ERROR) << s.ToString();
+  return false;
+}
+DEFINE_validator(wall_clock_jump_detection, &ValidateWallClockJumpDetection);
+
+bool ValidateWallClockJumpThreshold(const char* name, uint32_t value) {
+  if (value == 0) {
+    LOG(ERROR) << Substitute("--$0 must be greater than 0", name);
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(wall_clock_jump_threshold_sec, &ValidateWallClockJumpThreshold);
+
 bool ValidateTlsProtocol(const char* /*flagname*/, const string& value) {
   return IsValidTlsProtocolStr(value);
 }
@@ -584,13 +670,6 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
   fs_opts.file_cache = file_cache_.get();
   fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
 
-  if (FLAGS_use_hybrid_clock) {
-    clock_.reset(new clock::HybridClock(metric_entity_));
-  } else {
-    clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
-                                         metric_entity_));
-  }
-
   if (FLAGS_webserver_enabled) {
     web_server_.reset(new Webserver(options.webserver_opts));
   }
@@ -634,6 +713,25 @@ void ServerBase::GenerateInstanceID() {
 }
 
 Status ServerBase::Init() {
+  if (!FLAGS_use_hybrid_clock) {
+    clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
+                                         metric_entity_));
+  } else {
+    uint64_t threshold_usec = 0;
+    unique_ptr<InstanceMetadata> im;
+    RETURN_NOT_OK(WallClockJumpDetectionNeeded(&threshold_usec, &im));
+    if (threshold_usec > 0) {
+      LOG(INFO) << "enabling wall clock jump detection";
+    }
+    clock_.reset(new clock::HybridClock(
+        metric_entity_, threshold_usec, std::move(im)));
+  }
+
+  // Initialize the clock immediately. This checks that the clock is synchronized
+  // so we're less likely to get into a partially initialized state on disk during startup
+  // if we're having clock problems.
+  RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
+
   Timer* init = startup_path_handler_->init_progress();
   Timer* read_filesystem = startup_path_handler_->read_filesystem_progress();
   init->Start();
@@ -643,10 +741,6 @@ Status ServerBase::Init() {
 
   InitSpinLockContentionProfiling();
 
-  // Initialize the clock immediately. This checks that the clock is synchronized
-  // so we're less likely to get into a partially initialized state on disk during startup
-  // if we're having clock problems.
-  RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
   RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
   RETURN_NOT_OK(file_cache_->Init());
 
@@ -768,6 +862,55 @@ Status ServerBase::Init() {
   return rpc_server_->Bind();
 }
 
+Status ServerBase::WallClockJumpDetectionNeeded(
+    uint64_t* threshold_usec, unique_ptr<InstanceMetadata>* im) {
+  DCHECK(threshold_usec);
+  DCHECK(im);
+
+  TriState st = TriState::AUTO;
+  RETURN_NOT_OK(ParseTriStateFlag(
+      "wall_clock_jump_detection", FLAGS_wall_clock_jump_detection, &st));
+  switch (st) {
+    case TriState::DISABLED:
+      *threshold_usec = 0;
+      return Status::OK();
+    case TriState::ENABLED:
+      *threshold_usec = FLAGS_wall_clock_jump_threshold_sec * 1000UL * 1000UL;
+      return Status::OK();
+    default:
+      break;
+  }
+  DCHECK(st == TriState::AUTO);
+
+  InstanceDetector detector;
+  unique_ptr<InstanceMetadata> metadata;
+  if (const auto s = detector.Detect(&metadata); !s.ok()) {
+    LOG(INFO) << Substitute("$0: unable to detect cloud type of this node, "
+                            "probably running in non-cloud environment", s.ToString());
+    *threshold_usec = 0;
+    return Status::OK();
+  }
+  LOG(INFO) << Substitute("running on $0 node",
+                          cloud::TypeToString(metadata->type()));
+
+  // Enable an extra check to detect clock jumps on Azure VMs: those seem
+  // to be prone to clock jumps that aren't reflected in NTP clock
+  // synchronization status reported by ntp_adjtime()/ntp_gettime().
+  // Perhaps, that's due to the VMICTimeSync service interfering with the
+  // kernel NTP discipline after a 'memory preserving' maintenance: when a
+  // VM is 'unfrozen' after the update, the VMICTimeSync service updates the
+  // VM's clock to compensate for the pause (see [1]).
+  // [1] https://learn.microsoft.com/en-us/azure/virtual-machines/linux/time-sync
+  if (metadata->type() == CloudType::AZURE) {
+    *threshold_usec = FLAGS_wall_clock_jump_threshold_sec * 1000UL * 1000UL;
+  } else {
+    *threshold_usec = 0;
+  }
+  *im = std::move(metadata);
+
+  return Status::OK();
+}
+
 Status ServerBase::InitAcls() {
   string service_user;
   std::optional<string> keytab_user = security::GetLoggedInUsernameFromKeytab();
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 3802af1ac..8b9cc4089 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -52,6 +52,10 @@ namespace clock {
 class Clock;
 } // namespace clock
 
+namespace cloud {
+class InstanceMetadata;
+} // namespace cloud
+
 namespace rpc {
 class ResultTracker;
 class RpcContext;
@@ -226,6 +230,16 @@ class ServerBase {
   CountDownLatch stop_background_threads_latch_;
 
  private:
+  // Whether to enable sanity check on wall clock jumps for certain environments
+  // that are prone to such an issue. The logic is controlled by
+  // --wall_clock_jump_detection and --wall_clock_jump_threshold_sec flags.
+  // If with OK status the *threshold_usec > 0, the clock jump detection
+  // should be enabled with the specified threshold, while *threshold_usec == 0
+  // means clock jump detection isn't needed.
+  static Status WallClockJumpDetectionNeeded(
+      uint64_t* threshold_usec,
+      std::unique_ptr<cloud::InstanceMetadata>* im);
+
   Status InitAcls();
   void GenerateInstanceID();
   Status DumpServerInfo(const std::string& path,