You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/02 22:15:09 UTC

[06/11] incubator-kudu git commit: KUDU-1345. Fix case in which hybrid clock can run backwards

KUDU-1345. Fix case in which hybrid clock can run backwards

This fixes a bug with the following sequence of updates:

- the clock is at time (phys=1000, logical=0)
- we get an update from the future: (phys=1100, logical=0)
- we generate clock reads locally thousands of times, which causes the
  logical value to wrap into the physical value (phys=1105, logical=0)
- our physical clock advances to (phys=1101, logical=0)

Previously, we would incorrectly move the clock backwards to (1101, 0)
because the physical value was higher than the last external update.
This is obviously incorrect.

The fix simplifies the logic in the hybrid clock to just maintain a single
timestamp, rather than separately tracking the last logical and physical
time.

Cherry-picked from 8514b9e0fed13ec48b9fe8ca3feb298b07133e42

Change-Id: I9b4a04cb8b7b5eb879d017375714b3183be0c601
Reviewed-on: http://gerrit.cloudera.org:8080/2266
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: David Ribeiro Alves <da...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/2393
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c2d37631
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c2d37631
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c2d37631

Branch: refs/heads/branch-0.7.x
Commit: c2d376317b0b51a80154ca7e1be7789d627f69d4
Parents: d0ade2f
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Feb 19 19:01:11 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 2 21:08:40 2016 +0000

----------------------------------------------------------------------
 src/kudu/server/hybrid_clock-test.cc | 49 +++++++++++++++++++++++++++++++
 src/kudu/server/hybrid_clock.cc      | 43 ++++++++++++---------------
 src/kudu/server/hybrid_clock.h       |  7 ++---
 3 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c2d37631/src/kudu/server/hybrid_clock-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock-test.cc b/src/kudu/server/hybrid_clock-test.cc
index e8212df..2444360 100644
--- a/src/kudu/server/hybrid_clock-test.cc
+++ b/src/kudu/server/hybrid_clock-test.cc
@@ -73,6 +73,55 @@ TEST(MockHybridClockTest, TestMockedSystemClock) {
             HybridClock::TimestampFromMicrosecondsAndLogicalValue(time, 1).ToUint64());
 }
 
+// Test that, if the rate at which the clock is read is greater than the maximum
+// resolution of the logical counter (12 bits in our implementation), it properly
+// "overflows" into the physical portion of the clock, and maintains all ordering
+// guarantees even as the physical clock continues to increase.
+//
+// This is a regression test for KUDU-1345.
+TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
+  google::FlagSaver saver;
+  FLAGS_use_mock_wall_clock = true;
+  scoped_refptr<HybridClock> clock(new HybridClock());
+  clock->SetMockClockWallTimeForTests(1000);
+  clock->Init();
+
+  Timestamp prev = clock->Now();
+
+  // Update the clock from 10us in the future
+  clock->Update(HybridClock::TimestampFromMicroseconds(1010));
+
+  // Now read the clock value enough times so that the logical value wraps
+  // over, and should increment the _physical_ portion of the clock.
+  for (int i = 0; i < 10000; i++) {
+    Timestamp now = clock->Now();
+    ASSERT_GT(now.value(), prev.value());
+    prev = now;
+  }
+  ASSERT_EQ(1012, HybridClock::GetPhysicalValueMicros(prev));
+
+  // Advance the time microsecond by microsecond, and ensure the clock never
+  // goes backwards.
+  for (int time = 1001; time < 1020; time++) {
+    clock->SetMockClockWallTimeForTests(time);
+    Timestamp now = clock->Now();
+
+    // Clock should run strictly forwards.
+    ASSERT_GT(now.value(), prev.value());
+
+    // Additionally, once the physical time surpasses the logical time, we should
+    // be running on the physical clock. Otherwise, we should stick with the physical
+    // time we had rolled forward to above.
+    if (time > 1012) {
+      ASSERT_EQ(time, HybridClock::GetPhysicalValueMicros(now));
+    } else {
+      ASSERT_EQ(1012, HybridClock::GetPhysicalValueMicros(now));
+    }
+
+    prev = now;
+  }
+}
+
 // Test that two subsequent time reads are monotonically increasing.
 TEST_F(HybridClockTest, TestNow_ValuesIncreaseMonotonically) {
   const Timestamp now1 = clock_->Now();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c2d37631/src/kudu/server/hybrid_clock.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock.cc b/src/kudu/server/hybrid_clock.cc
index 929bdcb..937964f 100644
--- a/src/kudu/server/hybrid_clock.cc
+++ b/src/kudu/server/hybrid_clock.cc
@@ -140,8 +140,7 @@ HybridClock::HybridClock()
       divisor_(1),
 #endif
       tolerance_adjustment_(1),
-      last_usec_(0),
-      next_logical_(0),
+      next_timestamp_(0),
       state_(kNotInitialized) {
 }
 
@@ -230,11 +229,12 @@ void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec) {
         "Status: $0", s.ToString());
   }
 
-  // If the current time surpasses the last update just return it
-  if (PREDICT_TRUE(now_usec > last_usec_)) {
-    last_usec_ = now_usec;
-    next_logical_ = 1;
-    *timestamp = TimestampFromMicroseconds(last_usec_);
+  // If the physical time from the system clock is higher than our last-returned
+  // time, we should use the physical timestamp.
+  uint64_t candidate_phys_timestamp = now_usec << kBitsToShift;
+  if (PREDICT_TRUE(candidate_phys_timestamp > next_timestamp_)) {
+    next_timestamp_ = candidate_phys_timestamp;
+    *timestamp = Timestamp(next_timestamp_++);
     *max_error_usec = error_usec;
     if (PREDICT_FALSE(VLOG_IS_ON(2))) {
       VLOG(2) << "Current clock is higher than the last one. Resetting logical values."
@@ -262,14 +262,12 @@ void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec) {
   // This broadens the error interval for both cases but always returns
   // a correct error interval.
 
-  *max_error_usec = last_usec_ - (now_usec - error_usec);
-  *timestamp = TimestampFromMicrosecondsAndLogicalValue(last_usec_, next_logical_);
+  *max_error_usec = (next_timestamp_ >> kBitsToShift) - (now_usec - error_usec);
+  *timestamp = Timestamp(next_timestamp_++);
   if (PREDICT_FALSE(VLOG_IS_ON(2))) {
     VLOG(2) << "Current clock is lower than the last one. Returning last read and incrementing"
-        " logical values. Physical Value: " << now_usec << " usec Logical Value: "
-        << next_logical_ << " Error: " << *max_error_usec;
+        " logical values. Clock: " + Stringify(*timestamp) << " Error: " << *max_error_usec;
   }
-  next_logical_++;
 }
 
 Status HybridClock::Update(const Timestamp& to_update) {
@@ -278,10 +276,11 @@ Status HybridClock::Update(const Timestamp& to_update) {
   uint64_t error_ignored;
   NowWithError(&now, &error_ignored);
 
+  // If the incoming message is in the past relative to our current
+  // physical clock, there's nothing to do.
   if (PREDICT_TRUE(now.CompareTo(to_update) > 0)) return Status::OK();
 
   uint64_t to_update_physical = GetPhysicalValueMicros(to_update);
-  uint64_t to_update_logical = GetLogicalValue(to_update);
   uint64_t now_physical = GetPhysicalValueMicros(now);
 
   // we won't update our clock if to_update is more than 'max_clock_sync_error_usec'
@@ -291,8 +290,9 @@ Status HybridClock::Update(const Timestamp& to_update) {
     return Status::InvalidArgument("Tried to update clock beyond the max. error.");
   }
 
-  last_usec_ = to_update_physical;
-  next_logical_ = to_update_logical + 1;
+  // Our next timestamp must be higher than the one that we are updating
+  // from.
+  next_timestamp_ = to_update.value() + 1;
   return Status::OK();
 }
 
@@ -370,18 +370,11 @@ bool HybridClock::IsAfter(Timestamp t) {
   uint64_t error_usec;
   CHECK_OK(WalltimeWithError(&now_usec, &error_usec));
 
-  boost::lock_guard<simple_spinlock> lock(lock_);
-  now_usec = std::max(now_usec, last_usec_);
-
   Timestamp now;
-  if (now_usec > last_usec_) {
-    now = TimestampFromMicroseconds(now_usec);
-  } else {
-    // last_usec_ may be in the future if we were updated from a remote
-    // node.
-    now = TimestampFromMicrosecondsAndLogicalValue(last_usec_, next_logical_);
+  {
+    boost::lock_guard<simple_spinlock> lock(lock_);
+    now = Timestamp(std::max(next_timestamp_, now_usec << kBitsToShift));
   }
-
   return t.value() < now.value();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c2d37631/src/kudu/server/hybrid_clock.h
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock.h b/src/kudu/server/hybrid_clock.h
index 933c00c..bbf2fcb 100644
--- a/src/kudu/server/hybrid_clock.h
+++ b/src/kudu/server/hybrid_clock.h
@@ -188,10 +188,9 @@ class HybridClock : public Clock {
 
   mutable simple_spinlock lock_;
 
-  // the last clock read/update, in microseconds.
-  uint64_t last_usec_;
-  // the next logical value to be assigned to a timestamp
-  uint64_t next_logical_;
+  // The next timestamp to be generated from this clock, assuming that
+  // the physical clock hasn't advanced beyond the value stored here.
+  uint64_t next_timestamp_;
 
   // How many bits to left shift a microseconds clock read. The remainder
   // of the timestamp will be reserved for logical values.