You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/02 22:15:03 UTC

hbase git commit: HBASE-18498 Design improvements in Clock.java.

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14070.HLC 386b1df1d -> d9a990490


HBASE-18498 Design improvements in Clock.java.

- Delete PhysicalClock interface which seems useless give we have "System" implementation of Clock which is basically the same.
- Embed systemMonotonic clock into HLC to get rid of redundancy in management of physical component, esp logic around ensuring monotonicity.
- Make max_skew in clocks configurable
- Remove isMonotonicallyIncreasing() which is not used.
- update logical overflow test to not use hooks but prod code path
- Added lots of comments.

Change-Id: Ie775d28f15e864e7885b39e503c75670acbf4391


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d9a99049
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d9a99049
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d9a99049

Branch: refs/heads/HBASE-14070.HLC
Commit: d9a9904907ef0cf450b76933750d045237713403
Parents: 386b1df
Author: Apekshit Sharma <ap...@apache.org>
Authored: Fri Jul 21 20:21:22 2017 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Aug 2 15:14:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/Clock.java     | 274 ++++++++-----------
 .../apache/hadoop/hbase/util/AtomicUtils.java   |   1 +
 .../java/org/apache/hadoop/hbase/TestClock.java |  91 +++---
 .../hbase/regionserver/HRegionServer.java       |   6 +-
 .../hadoop/hbase/TestClockWithCluster.java      |  79 +++---
 5 files changed, 210 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
index 0e2320f..6a0374e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
@@ -81,11 +81,6 @@ public interface Clock {
   boolean isMonotonic();
 
   /**
-   * @return true if the clock implementation gives monotonically increasing timestamps else false.
-   */
-  boolean isMonotonicallyIncreasing();
-
-  /**
    * @return {@link org.apache.hadoop.hbase.TimestampType}
    */
   TimestampType getTimestampType();
@@ -95,6 +90,10 @@ public interface Clock {
    */
   ClockType getClockType();
 
+  /**
+   * @return {@link TimeUnit} of the physical time used by the clock.
+   */
+  TimeUnit getTimeUnit();
 
   /**
    * Indicates that Physical Time or Logical Time component has overflowed. This extends
@@ -107,41 +106,7 @@ public interface Clock {
     }
   }
 
-  //////////////////////////////////////////////////////////////////
-  // Physical Clock
-  //////////////////////////////////////////////////////////////////
-
-  interface PhysicalClock {
-    /**
-     * This is a method to get the current time.
-     *
-     * @return Timestamp of current time in 64 bit representation corresponding to the particular
-     * clock
-     */
-    long now() throws RuntimeException;
-
-    /**
-     * This is a method to get the unit of the physical time used by the clock
-     *
-     * @return A {@link TimeUnit}
-     */
-    TimeUnit getTimeUnit();
-  }
-
-  class JavaMillisPhysicalClock implements PhysicalClock {
-    @Override
-    public long now() {
-      return EnvironmentEdgeManager.currentTime();
-    }
-
-    @Override
-    public TimeUnit getTimeUnit() {
-      return TimeUnit.MILLISECONDS;
-    }
-  }
-
-  JavaMillisPhysicalClock DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK =
-      new JavaMillisPhysicalClock();
+  Clock SYSTEM_CLOCK = new System();
 
   //////////////////////////////////////////////////////////////////
   // Implementation of clocks
@@ -149,20 +114,18 @@ public interface Clock {
 
   /**
    * System clock is an implementation of clock which doesn't give any monotonic guarantees.
+   * Since it directly represents system's actual clock which cannot be changed, update() function
+   * is no-op.
    */
-  class System implements Clock, PhysicalClock {
-    private final PhysicalClock physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
-    private final ClockType clockType = ClockType.SYSTEM;
-    private final TimestampType timestampType = TimestampType.PHYSICAL;
-
+  class System implements Clock {
     @Override
     public long now() {
-      return physicalClock.now();
+      return EnvironmentEdgeManager.currentTime();
     }
 
     @Override
     public long update(long timestamp) {
-      return physicalClock.now();
+      return EnvironmentEdgeManager.currentTime();
     }
 
     @Override
@@ -171,52 +134,62 @@ public interface Clock {
     }
 
     @Override
-    public boolean isMonotonicallyIncreasing() {
-      return false;
-    }
-
     public TimeUnit getTimeUnit() {
-      return physicalClock.getTimeUnit();
+      return TimeUnit.MILLISECONDS;
     }
 
     @Override
-    public TimestampType getTimestampType() { return timestampType; }
+    public TimestampType getTimestampType() {
+      return TimestampType.PHYSICAL;
+    }
 
     @Override
-    public ClockType getClockType() { return clockType; }
+    public ClockType getClockType() {
+      return ClockType.SYSTEM;
+    }
   }
 
   /**
    * System clock is an implementation of clock which guarantees monotonically non-decreasing
    * timestamps.
    */
-  class SystemMonotonic implements Clock, PhysicalClock {
+  class SystemMonotonic implements Clock {
     private final long maxClockSkewInMs;
-    private final PhysicalClock physicalClock;
+    private final Clock systemClock;
     private final AtomicLong physicalTime = new AtomicLong();
-    private final ClockType clockType = ClockType.SYSTEM_MONOTONIC;
-    private final TimestampType timestampType = TimestampType.PHYSICAL;
 
-    public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkewInMs) {
-      this.physicalClock = physicalClock;
-      this.maxClockSkewInMs = maxClockSkewInMs > 0 ?
-          maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+    public SystemMonotonic(long maxClockSkewInMs) {
+      this(SYSTEM_CLOCK, maxClockSkewInMs);
     }
 
+    @VisibleForTesting
     public SystemMonotonic() {
-      this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
-      this.maxClockSkewInMs = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+      this(DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    }
+
+    @VisibleForTesting
+    public SystemMonotonic(Clock systemClock) {
+      this(systemClock, DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    }
+
+    @VisibleForTesting
+    public SystemMonotonic(Clock systemClock, long maxClockSkewInMs) {
+      this.systemClock = systemClock;
+      this.maxClockSkewInMs = maxClockSkewInMs > 0 ?
+          maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
     }
 
     @Override
     public long now() {
-      long systemTime = physicalClock.now();
-      updateMax(physicalTime, systemTime);
+      updateMax(physicalTime, systemClock.now());
       return physicalTime.get();
     }
 
+    /**
+     * @throws ClockException If timestamp exceeds max clock skew allowed.
+     */
     public long update(long targetTimestamp) throws ClockException {
-      final long systemTime = physicalClock.now();
+      final long systemTime = systemClock.now();
       if (maxClockSkewInMs > 0 && (targetTimestamp - systemTime) > maxClockSkewInMs) {
         throw new ClockException(
             "Received event with timestamp:" + getTimestampType().toString(targetTimestamp)
@@ -233,107 +206,96 @@ public interface Clock {
     }
 
     @Override
-    public boolean isMonotonicallyIncreasing() {
-      return false;
-    }
-
     public TimeUnit getTimeUnit() {
-      return physicalClock.getTimeUnit();
-    }
-
-    @VisibleForTesting
-    void setPhysicalTime(long time) {
-      physicalTime.set(time);
+      return systemClock.getTimeUnit();
     }
 
     @Override
-    public TimestampType getTimestampType() { return timestampType; }
+    public TimestampType getTimestampType() {
+      return TimestampType.PHYSICAL;
+    }
 
     @Override
-    public ClockType getClockType() { return clockType; }
+    public ClockType getClockType() {
+      return ClockType.SYSTEM_MONOTONIC;
+    }
   }
 
-  class HLC implements Clock, PhysicalClock {
-    private final PhysicalClock physicalClock;
-    private final long maxClockSkew;
-    private final long maxPhysicalTime;
-    private final long maxLogicalTime;
-    private long physicalTime;
-    private long logicalTime;
-    private final ClockType clockType = ClockType.HLC;
-    private final TimestampType timestampType = TimestampType.HYBRID;
-
-    public HLC(PhysicalClock physicalClock, long maxClockSkew) {
-      this.physicalClock = physicalClock;
-      this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
-      this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
-      this.maxLogicalTime = timestampType.getMaxLogicalTime();
-      this.physicalTime = 0;
-      this.logicalTime = 0;
+  /**
+   * HLC clock implementation.
+   * Monotonicity guarantee of physical component of time comes from {@link SystemMonotonic} clock.
+   */
+  class HLC implements Clock {
+    private static final TimestampType TIMESTAMP_TYPE = TimestampType.HYBRID;
+    private static final long MAX_PHYSICAL_TIME = TIMESTAMP_TYPE.getMaxPhysicalTime();
+    private static final long MAX_LOGICAL_TIME = TIMESTAMP_TYPE.getMaxLogicalTime();
+    private final Clock systemMonotonicClock;
+    private long currentPhysicalTime = 0;
+    private long currentLogicalTime = 0;
+
+    public HLC(long maxClockSkewInMs) {
+      this(new SystemMonotonic(maxClockSkewInMs));
     }
 
+    @VisibleForTesting
     public HLC() {
-      this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
-      this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
-      this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
-      this.maxLogicalTime = timestampType.getMaxLogicalTime();
-      this.physicalTime = 0;
-      this.logicalTime = 0;
+      this(DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    }
+
+    /**
+     * @param systemMonotonicClock Clock to get physical component of time. Should be monotonic
+     *                             clock.
+     */
+    @VisibleForTesting
+    public HLC(Clock systemMonotonicClock) {
+      assert(systemMonotonicClock.isMonotonic());
+      this.systemMonotonicClock = systemMonotonicClock;
     }
 
     @Override
     public synchronized long now() throws ClockException {
-      final long systemTime = physicalClock.now();
-
-      checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
-      checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
-
-      if (systemTime <= physicalTime) {
-        logicalTime++;
-      } else if (systemTime > physicalTime) {
-        logicalTime = 0;
-        physicalTime = systemTime;
+      final long newSystemTime = systemMonotonicClock.now();
+      if (newSystemTime <= currentPhysicalTime) {
+        currentLogicalTime++;
+      } else if (newSystemTime > currentPhysicalTime) {
+        currentLogicalTime = 0;
+        currentPhysicalTime = newSystemTime;
       }
+      checkPhysicalTimeOverflow(newSystemTime, MAX_PHYSICAL_TIME);
+      checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME);
 
       return toTimestamp();
     }
 
     /**
-     * Updates {@link HLC} with the given timestamp received from elsewhere (possibly
-     * some other node). Returned timestamp is strict greater than msgTimestamp and local
-     * timestamp.
+     * Updates {@link HLC} with the given time received from elsewhere (possibly some other node).
      *
-     * @param timestamp timestamp from the external message.
-     * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and
-     * msgTimestamp
-     * @throws ClockException
+     * @param targetTime time from the external message.
+     * @return a hybrid timestamp of HLC that is strict greater than given {@code targetTime} and
+     * previously returned times.
+     * @throws ClockException If timestamp exceeds max clock skew allowed.
      */
     @Override
-    public synchronized long update(long timestamp)
+    public synchronized long update(long targetTime)
         throws ClockException {
-      final long targetPhysicalTime = timestampType.getPhysicalTime(timestamp);
-      final long targetLogicalTime = timestampType.getLogicalTime(timestamp);
-      final long oldPhysicalTime = physicalTime;
-      final long systemTime = physicalClock.now();
-
-      physicalTime = Math.max(Math.max(oldPhysicalTime, targetPhysicalTime), systemTime);
-      checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
-
-      if (targetPhysicalTime - systemTime > maxClockSkew) {
-        throw new ClockException("Received event with timestamp:" +
-            timestampType.toString(timestamp) + " which is greater than allowed clock skew");
-      }
-      if (physicalTime == oldPhysicalTime && oldPhysicalTime == targetPhysicalTime) {
-        logicalTime = Math.max(logicalTime, targetLogicalTime) + 1;
-      } else if (physicalTime == targetPhysicalTime) {
-        logicalTime = targetLogicalTime + 1;
-      } else if (physicalTime == oldPhysicalTime) {
-        logicalTime++;
+      final long targetPhysicalTime = TIMESTAMP_TYPE.getPhysicalTime(targetTime);
+      final long targetLogicalTime = TIMESTAMP_TYPE.getLogicalTime(targetTime);
+      final long oldPhysicalTime = currentPhysicalTime;
+      currentPhysicalTime = systemMonotonicClock.update(targetPhysicalTime);
+
+      checkPhysicalTimeOverflow(currentPhysicalTime, MAX_PHYSICAL_TIME);
+
+      if (currentPhysicalTime == targetPhysicalTime && currentPhysicalTime == oldPhysicalTime) {
+        currentLogicalTime = Math.max(currentLogicalTime, targetLogicalTime) + 1;
+      } else if (currentPhysicalTime == targetPhysicalTime) {
+        currentLogicalTime = targetLogicalTime + 1;
+      } else if (currentPhysicalTime == oldPhysicalTime) {
+        currentLogicalTime++;
       } else {
-        logicalTime = 0;
+        currentLogicalTime = 0;
       }
 
-      checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
+      checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME);
       return toTimestamp();
     }
 
@@ -342,40 +304,30 @@ public interface Clock {
       return true;
     }
 
-    @Override
-    public boolean isMonotonicallyIncreasing() {
-      return true;
-    }
-
     public TimeUnit getTimeUnit() {
-      return physicalClock.getTimeUnit();
+      return systemMonotonicClock.getTimeUnit();
     }
 
     private long toTimestamp() {
-      return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime);
-    }
-
-    @VisibleForTesting
-    synchronized void setLogicalTime(long logicalTime) {
-      this.logicalTime = logicalTime;
+      return TIMESTAMP_TYPE.toTimestamp(TimeUnit.MILLISECONDS, currentPhysicalTime,
+          currentLogicalTime);
     }
 
     @VisibleForTesting
-    synchronized void setPhysicalTime(long physicalTime) {
-      this.physicalTime = physicalTime;
-    }
-
-    @VisibleForTesting
-    synchronized long getLogicalTime() { return logicalTime; }
+    synchronized long getLogicalTime() { return currentLogicalTime; }
 
     @VisibleForTesting
-    synchronized long getPhysicalTime() { return physicalTime; }
+    synchronized long getPhysicalTime() { return currentPhysicalTime; }
 
     @Override
-    public TimestampType getTimestampType() { return timestampType; }
+    public TimestampType getTimestampType() {
+      return TIMESTAMP_TYPE;
+    }
 
     @Override
-    public ClockType getClockType() { return clockType; }
+    public ClockType getClockType() {
+      return ClockType.HLC;
+    }
   }
 
   //////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
index 35391ee..cc32263 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
@@ -46,6 +46,7 @@ public class AtomicUtils {
   /**
    * Updates a AtomicLong which is supposed to maintain the maximum values. This method is not
    * synchronized but is thread-safe.
+   * @return true if {@code max} was updated with {@code value}, else false.
    */
   public static void updateMax(AtomicLong max, long value) {
     while (true) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
index 2476c51..f9ad8e4 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.*;
 @Category(SmallTests.class)
 public class TestClock {
 
-  static final Clock.PhysicalClock PHYSICAL_CLOCK = mock(Clock.PhysicalClock.class);
+  static final Clock MOCK_CLOCK = mock(Clock.class);
   static final long MAX_CLOCK_SKEW_IN_MS = 1000;
 
   @Rule
@@ -84,7 +84,7 @@ public class TestClock {
 
   @Before
   public void setUp() {
-    when(PHYSICAL_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+    when(MOCK_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
   }
 
   // All Clocks Tests
@@ -92,40 +92,47 @@ public class TestClock {
   @Test
   public void testSystemMonotonicNow() {
     MonotonicityCheckerClock systemMonotonic =
-        new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false);
+        new MonotonicityCheckerClock(new Clock.SystemMonotonic(MOCK_CLOCK), false);
 
     // case 1: Set time and assert
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertEquals(100, systemMonotonic.now());
 
     // case 2: Go back in time and check monotonic property.
-    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    when(MOCK_CLOCK.now()).thenReturn(99L);
     assertEquals(100, systemMonotonic.now());
 
     // case 3: system time goes ahead compared to previous timestamp.
-    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    when(MOCK_CLOCK.now()).thenReturn(101L);
     assertEquals(101, systemMonotonic.now());
 
     systemMonotonic.assertMonotonic();
   }
 
+  /**
+   * Tests that
+   * - Progressing mock clock progresses SystemMonotonic clock.
+   * - Skews in the clock are correctly updated/not changed on call to update(), depending on
+   * target time and clock's own time ( = system time + current skew).
+   */
   @Test
   public void testSystemMonotonicUpdate() {
+    Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(MOCK_CLOCK);
     MonotonicityCheckerClock systemMonotonic =
-        new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false);
+        new MonotonicityCheckerClock(systemMonotonicClock, false);
 
     // Sets internal time
-    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    when(MOCK_CLOCK.now()).thenReturn(99L);
     assertEquals(99, systemMonotonic.now());
 
     // case 1: Message timestamp is greater than current System Monotonic Time,
     // physical time at 100 still.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertEquals(102, systemMonotonic.update(102));
 
     // case 2: Message timestamp is greater than current System Monotonic Time,
     // physical time at 100 still.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertEquals(103, systemMonotonic.update(103));
 
     // case 3: Message timestamp is less than current System Monotonic Time, greater than current
@@ -137,11 +144,11 @@ public class TestClock {
     assertEquals(103, systemMonotonic.update(99));
 
     // case 5: Message timestamp<System monotonic time and both less than current Physical Time
-    when(PHYSICAL_CLOCK.now()).thenReturn(106L);
+    when(MOCK_CLOCK.now()).thenReturn(106L);
     assertEquals(106, systemMonotonic.update(102));
 
     // case 6: Message timestamp>System monotonic time and both less than current Physical Time
-    when(PHYSICAL_CLOCK.now()).thenReturn(109L);
+    when(MOCK_CLOCK.now()).thenReturn(109L);
     assertEquals(109, systemMonotonic.update(108));
 
     systemMonotonic.assertMonotonic();
@@ -151,10 +158,10 @@ public class TestClock {
   public void testSystemMonotonicUpdateMaxClockSkew() throws Clock.ClockException {
     final long time = 100L;
     Clock.SystemMonotonic systemMonotonic =
-        new Clock.SystemMonotonic(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS);
+        new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS);
 
     // Set Current Time.
-    when(PHYSICAL_CLOCK.now()).thenReturn(time);
+    when(MOCK_CLOCK.now()).thenReturn(time);
     systemMonotonic.now();
 
     // Shouldn't throw ClockException
@@ -173,28 +180,29 @@ public class TestClock {
 
   @Test
   public void testHLCNow() throws Clock.ClockException {
-    MonotonicityCheckerClock hybridLogicalClock =
-        new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 30000), true);
+    MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock(
+        new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)),
+        true);  // true for strict monotonicity
 
     // case 1: Test if it returns correct time based on current physical time.
     //         Remember, initially logical time = 0
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertHLCTime(hybridLogicalClock.now(), 100, 0);
 
     // case 2: physical time doesn't change, logical time should increment.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertHLCTime(hybridLogicalClock.now(), 100, 1);
 
     // case 3: physical time doesn't change still, logical time should increment again
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     assertHLCTime(hybridLogicalClock.now(), 100, 2);
 
     // case 4: physical time moves forward, logical time should reset to 0.
-    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    when(MOCK_CLOCK.now()).thenReturn(101L);
     assertHLCTime(hybridLogicalClock.now(), 101, 0);
 
     // case 5: Monotonic increasing check, physical time goes back.
-    when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+    when(MOCK_CLOCK.now()).thenReturn(99L);
     assertHLCTime(hybridLogicalClock.now(), 101, 1);
 
     hybridLogicalClock.assertMonotonic();
@@ -202,55 +210,57 @@ public class TestClock {
 
   @Test
   public void testHLCNowLogicalTimeOverFlow() {
-    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
-
-    // Set Current Time.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
-    hybridLogicalClock.setPhysicalTime(100);
-    hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime());
+    Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK));
 
+    when(MOCK_CLOCK.now()).thenReturn(100L);
+    hybridLogicalClock.now();  // current time (100, 0)
+    for (int i = 0; i < TimestampType.HYBRID.getMaxLogicalTime() - 1; i++) {
+      hybridLogicalClock.now();
+    }
     exception.expect(Clock.ClockException.class);
     hybridLogicalClock.now();
   }
 
+  // No need to check skews in this test, since they are member of SystemMonotonic and not HLC.
   @Test
   public void testHLCUpdate() throws Clock.ClockException {
     long messageTimestamp;
-    MonotonicityCheckerClock hybridLogicalClock =
-        new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 100), true);
+    MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock(
+        new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)),
+        true);  // true for strictly increasing check
 
     // Set Current Time.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     hybridLogicalClock.now();
 
     // case 1: Message physical timestamp is lower than current physical time.
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 99, 1);
-    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    when(MOCK_CLOCK.now()).thenReturn(101L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 101, 0);
 
     // case 2: Message physical timestamp is greater than HLC physical time.
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 3);
-    when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+    when(MOCK_CLOCK.now()).thenReturn(102L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 4);
 
     // case 3: Message timestamp is less than HLC timestamp
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 104 , 4);
-    when(PHYSICAL_CLOCK.now()).thenReturn(103L);
+    when(MOCK_CLOCK.now()).thenReturn(103L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 5);
 
     //case 4: Message timestamp with same physical time as HLC, but lower logical time
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 2);
-    when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+    when(MOCK_CLOCK.now()).thenReturn(101L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 6);
 
     //case 5: Message timestamp with same physical time as HLC, but higher logical time
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 8);
-    when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+    when(MOCK_CLOCK.now()).thenReturn(102L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 9);
 
     //case 6: Actual Physical Time greater than message physical timestamp and HLC physical time.
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 10);
-    when(PHYSICAL_CLOCK.now()).thenReturn(110L);
+    when(MOCK_CLOCK.now()).thenReturn(110L);
     assertHLCTime(hybridLogicalClock.update(messageTimestamp), 110, 0);
 
     hybridLogicalClock.assertMonotonic();
@@ -259,10 +269,10 @@ public class TestClock {
   @Test
   public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException {
     long messageTimestamp;
-    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
+    Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK));
 
     // Set Current Time.
-    when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+    when(MOCK_CLOCK.now()).thenReturn(100L);
     hybridLogicalClock.now();
 
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 100,
@@ -275,10 +285,11 @@ public class TestClock {
   public void testHLCUpdateMaxClockSkew() throws Clock.ClockException {
     final long time = 100;
     long messageTimestamp;
-    Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS);
+    Clock.HLC hybridLogicalClock = new Clock.HLC(
+        new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS));
 
     // Set Current Time.
-    when(PHYSICAL_CLOCK.now()).thenReturn(time);
+    when(MOCK_CLOCK.now()).thenReturn(time);
     hybridLogicalClock.now();
 
     messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d8d87f8..7239a29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -598,8 +598,10 @@ public class HRegionServer extends HasThread implements
     this.abortRequested = false;
     this.stopped = false;
 
-    this.hybridLogicalClock = new Clock.HLC();
-    this.systemMonotonicClock = new Clock.SystemMonotonic();
+    final long maxClockSkew =
+        conf.getLong("hbase.max.clock.skew.in.ms", Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    this.hybridLogicalClock = new Clock.HLC(maxClockSkew);
+    this.systemMonotonicClock = new Clock.SystemMonotonic(maxClockSkew);
     this.systemClock = new Clock.System();
 
     rpcServices = createRpcServices();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
index bc95f46..ea46090 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -198,18 +199,19 @@ public class TestClockWithCluster {
     assertNotNull(regionMeta);
 
     // Inject physical clock that always returns same physical time into hybrid logical clock
-    long systemTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now();
-    Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
-    when(physicalClock.now()).thenReturn(systemTime);
-    when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
-    Clock.HLC clock = new Clock.HLC(physicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    long systemTime = Clock.SYSTEM_CLOCK.now();
+    Clock mockSystemClock = mock(Clock.class);
+    when(mockSystemClock.now()).thenReturn(systemTime);
+    when(mockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+    when(mockSystemClock.isMonotonic()).thenReturn(true);
+    Clock.HLC masterHLC = new Clock.HLC(new Clock.SystemMonotonic(mockSystemClock));
 
     // The region clock is used for setting timestamps for table mutations and the region server
     // clock is used for updating the clock on region assign/unassign events.
 
     // Set meta region clock so that region state transitions are timestamped with mocked clock
-    regionMeta.setClock(clock);
-    master.setClock(clock);
+    regionMeta.setClock(masterHLC);
+    master.setClock(masterHLC);
 
     HRegion userRegion = null;
     for (Region region : regions) {
@@ -221,13 +223,13 @@ public class TestClockWithCluster {
 
     // Only mock the region server clock because the region clock does not get used during
     // unassignment and assignment
-    rs.setClock(clock);
+    rs.setClock(masterHLC);
 
     // Repeatedly unassign and assign region while tracking the timestamps of the region state
     // transitions from the meta table
     List<Long> timestamps = new ArrayList<>();
     // Set expected logical time to 0 as initial clock.now() sets clock's logical time to 0
-    long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(clock.now());
+    long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(masterHLC.now());
     for (int i = 0; i < 10; i++) {
       admin.unassign(hriOnline.getRegionName(), false);
       assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
@@ -243,8 +245,8 @@ public class TestClockWithCluster {
       // 8,9 [now]    Update hbase:meta
       expectedLogicalTime += 10;
 
-      assertEquals(expectedLogicalTime, clock.getLogicalTime());
-      timestamps.add(clock.getLogicalTime());
+      assertEquals(expectedLogicalTime, masterHLC.getLogicalTime());
+      timestamps.add(masterHLC.getLogicalTime());
 
       admin.assign(hriOnline.getRegionName());
       // clock.now() is called 7 times and clock.update() is called 2 times, each call increments
@@ -260,8 +262,8 @@ public class TestClockWithCluster {
       // gets the region info from assignment manager rather than meta table accessor
       expectedLogicalTime += 9;
       assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
-      assertEquals(expectedLogicalTime, clock.getLogicalTime());
-      timestamps.add(clock.getLogicalTime());
+      assertEquals(expectedLogicalTime, masterHLC.getLogicalTime());
+      timestamps.add(masterHLC.getLogicalTime());
     }
 
     // Ensure that the hybrid timestamps are strictly increasing
@@ -305,62 +307,63 @@ public class TestClockWithCluster {
     assertNotNull(regionMeta);
 
     // Instantiate two hybrid logical clocks with mocked physical clocks
-    long expectedPhysicalTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now();
-    Clock.PhysicalClock masterPhysicalClock = mock(Clock.PhysicalClock.class);
-    when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
-    when(masterPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
-    Clock.HLC masterClock = new Clock.HLC(masterPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
-    master.setClock(masterClock);
-    regionMeta.setClock(masterClock);
-
-    Clock.PhysicalClock rsPhysicalClock = mock(Clock.PhysicalClock.class);
-    when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
-    when(rsPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
-    Clock.HLC rsClock = new Clock.HLC(rsPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+    long expectedPhysicalTime = Clock.SYSTEM_CLOCK.now();
+    Clock masterMockSystemClock = mock(Clock.class);
+    when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
+    when(masterMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+    Clock.HLC masterHLC= new Clock.HLC(new Clock.SystemMonotonic(masterMockSystemClock));
+    master.setClock(masterHLC);
+    regionMeta.setClock(masterHLC);
+
+    Clock rsMockSystemClock = mock(Clock.class);
+    when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
+    when(rsMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+    when(rsMockSystemClock.isMonotonic()).thenReturn(true);
+    Clock.HLC rsHLC= new Clock.HLC(new Clock.SystemMonotonic(rsMockSystemClock));
     // We only mock the region server clock here because the region clock does not get used
     // during unassignment and assignment
-    rs.setClock(rsClock);
+    rs.setClock(rsHLC);
 
     // Increment master physical clock time
     expectedPhysicalTime += 1000;
-    when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+    when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
 
     // Unassign region, region server should advance its clock upon receiving close region request
     admin.unassign(hriOnline.getRegionName(), false);
     assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
     // Verify that region server clock time increased
     // Previous test has explanation for each event that increases logical time
-    assertHLCTime(masterClock, expectedPhysicalTime, 9);
-    assertHLCTime(rsClock, expectedPhysicalTime, 6);
+    assertHLCTime(masterHLC, expectedPhysicalTime, 9);
+    assertHLCTime(rsHLC, expectedPhysicalTime, 6);
 
     // Increase region server physical clock time
     expectedPhysicalTime += 1000;
-    when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+    when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
     // Assign region, master server should advance its clock upon receiving close region response
     admin.assign(hriOnline.getRegionName());
     assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
     // Verify that master clock time increased
-    assertHLCTime(masterClock, expectedPhysicalTime, 4);
-    assertHLCTime(rsClock, expectedPhysicalTime, 1);
+    assertHLCTime(masterHLC, expectedPhysicalTime, 4);
+    assertHLCTime(rsHLC, expectedPhysicalTime, 1);
 
     // Increment region server physical clock time
     expectedPhysicalTime += 1000;
-    when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+    when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
     // Unassign region, region server should advance its clock upon receiving close region request
     admin.unassign(hriOnline.getRegionName(), false);
     assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
     // Verify that master server clock time increased
-    assertHLCTime(masterClock, expectedPhysicalTime, 4);
-    assertHLCTime(rsClock, expectedPhysicalTime, 1);
+    assertHLCTime(masterHLC, expectedPhysicalTime, 4);
+    assertHLCTime(rsHLC, expectedPhysicalTime, 1);
 
     // Increase master server physical clock time
     expectedPhysicalTime += 1000;
-    when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+    when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
     // Assign region, master server should advance its clock upon receiving close region response
     admin.assign(hriOnline.getRegionName());
     assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
     // Verify that region server clock time increased
-    assertHLCTime(masterClock, expectedPhysicalTime, 8);
-    assertHLCTime(rsClock, expectedPhysicalTime, 5);
+    assertHLCTime(masterHLC, expectedPhysicalTime, 8);
+    assertHLCTime(rsHLC, expectedPhysicalTime, 5);
   }
 }
\ No newline at end of file