You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/02 22:15:03 UTC
hbase git commit: HBASE-18498 Design improvements in Clock.java.
Repository: hbase
Updated Branches:
refs/heads/HBASE-14070.HLC 386b1df1d -> d9a990490
HBASE-18498 Design improvements in Clock.java.
- Delete PhysicalClock interface which seems useless give we have "System" implementation of Clock which is basically the same.
- Embed systemMonotonic clock into HLC to get rid of redundancy in management of physical component, esp logic around ensuring monotonicity.
- Make max_skew in clocks configurable
- Remove isMonotonicallyIncreasing() which is not used.
- update logical overflow test to not use hooks but prod code path
- Added lots of comments.
Change-Id: Ie775d28f15e864e7885b39e503c75670acbf4391
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d9a99049
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d9a99049
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d9a99049
Branch: refs/heads/HBASE-14070.HLC
Commit: d9a9904907ef0cf450b76933750d045237713403
Parents: 386b1df
Author: Apekshit Sharma <ap...@apache.org>
Authored: Fri Jul 21 20:21:22 2017 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Aug 2 15:14:38 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/Clock.java | 274 ++++++++-----------
.../apache/hadoop/hbase/util/AtomicUtils.java | 1 +
.../java/org/apache/hadoop/hbase/TestClock.java | 91 +++---
.../hbase/regionserver/HRegionServer.java | 6 +-
.../hadoop/hbase/TestClockWithCluster.java | 79 +++---
5 files changed, 210 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
index 0e2320f..6a0374e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
@@ -81,11 +81,6 @@ public interface Clock {
boolean isMonotonic();
/**
- * @return true if the clock implementation gives monotonically increasing timestamps else false.
- */
- boolean isMonotonicallyIncreasing();
-
- /**
* @return {@link org.apache.hadoop.hbase.TimestampType}
*/
TimestampType getTimestampType();
@@ -95,6 +90,10 @@ public interface Clock {
*/
ClockType getClockType();
+ /**
+ * @return {@link TimeUnit} of the physical time used by the clock.
+ */
+ TimeUnit getTimeUnit();
/**
* Indicates that Physical Time or Logical Time component has overflowed. This extends
@@ -107,41 +106,7 @@ public interface Clock {
}
}
- //////////////////////////////////////////////////////////////////
- // Physical Clock
- //////////////////////////////////////////////////////////////////
-
- interface PhysicalClock {
- /**
- * This is a method to get the current time.
- *
- * @return Timestamp of current time in 64 bit representation corresponding to the particular
- * clock
- */
- long now() throws RuntimeException;
-
- /**
- * This is a method to get the unit of the physical time used by the clock
- *
- * @return A {@link TimeUnit}
- */
- TimeUnit getTimeUnit();
- }
-
- class JavaMillisPhysicalClock implements PhysicalClock {
- @Override
- public long now() {
- return EnvironmentEdgeManager.currentTime();
- }
-
- @Override
- public TimeUnit getTimeUnit() {
- return TimeUnit.MILLISECONDS;
- }
- }
-
- JavaMillisPhysicalClock DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK =
- new JavaMillisPhysicalClock();
+ Clock SYSTEM_CLOCK = new System();
//////////////////////////////////////////////////////////////////
// Implementation of clocks
@@ -149,20 +114,18 @@ public interface Clock {
/**
* System clock is an implementation of clock which doesn't give any monotonic guarantees.
+ * Since it directly represents system's actual clock which cannot be changed, update() function
+ * is no-op.
*/
- class System implements Clock, PhysicalClock {
- private final PhysicalClock physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
- private final ClockType clockType = ClockType.SYSTEM;
- private final TimestampType timestampType = TimestampType.PHYSICAL;
-
+ class System implements Clock {
@Override
public long now() {
- return physicalClock.now();
+ return EnvironmentEdgeManager.currentTime();
}
@Override
public long update(long timestamp) {
- return physicalClock.now();
+ return EnvironmentEdgeManager.currentTime();
}
@Override
@@ -171,52 +134,62 @@ public interface Clock {
}
@Override
- public boolean isMonotonicallyIncreasing() {
- return false;
- }
-
public TimeUnit getTimeUnit() {
- return physicalClock.getTimeUnit();
+ return TimeUnit.MILLISECONDS;
}
@Override
- public TimestampType getTimestampType() { return timestampType; }
+ public TimestampType getTimestampType() {
+ return TimestampType.PHYSICAL;
+ }
@Override
- public ClockType getClockType() { return clockType; }
+ public ClockType getClockType() {
+ return ClockType.SYSTEM;
+ }
}
/**
* System clock is an implementation of clock which guarantees monotonically non-decreasing
* timestamps.
*/
- class SystemMonotonic implements Clock, PhysicalClock {
+ class SystemMonotonic implements Clock {
private final long maxClockSkewInMs;
- private final PhysicalClock physicalClock;
+ private final Clock systemClock;
private final AtomicLong physicalTime = new AtomicLong();
- private final ClockType clockType = ClockType.SYSTEM_MONOTONIC;
- private final TimestampType timestampType = TimestampType.PHYSICAL;
- public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkewInMs) {
- this.physicalClock = physicalClock;
- this.maxClockSkewInMs = maxClockSkewInMs > 0 ?
- maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+ public SystemMonotonic(long maxClockSkewInMs) {
+ this(SYSTEM_CLOCK, maxClockSkewInMs);
}
+ @VisibleForTesting
public SystemMonotonic() {
- this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
- this.maxClockSkewInMs = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
+ this(DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ }
+
+ @VisibleForTesting
+ public SystemMonotonic(Clock systemClock) {
+ this(systemClock, DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ }
+
+ @VisibleForTesting
+ public SystemMonotonic(Clock systemClock, long maxClockSkewInMs) {
+ this.systemClock = systemClock;
+ this.maxClockSkewInMs = maxClockSkewInMs > 0 ?
+ maxClockSkewInMs : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
}
@Override
public long now() {
- long systemTime = physicalClock.now();
- updateMax(physicalTime, systemTime);
+ updateMax(physicalTime, systemClock.now());
return physicalTime.get();
}
+ /**
+ * @throws ClockException If timestamp exceeds max clock skew allowed.
+ */
public long update(long targetTimestamp) throws ClockException {
- final long systemTime = physicalClock.now();
+ final long systemTime = systemClock.now();
if (maxClockSkewInMs > 0 && (targetTimestamp - systemTime) > maxClockSkewInMs) {
throw new ClockException(
"Received event with timestamp:" + getTimestampType().toString(targetTimestamp)
@@ -233,107 +206,96 @@ public interface Clock {
}
@Override
- public boolean isMonotonicallyIncreasing() {
- return false;
- }
-
public TimeUnit getTimeUnit() {
- return physicalClock.getTimeUnit();
- }
-
- @VisibleForTesting
- void setPhysicalTime(long time) {
- physicalTime.set(time);
+ return systemClock.getTimeUnit();
}
@Override
- public TimestampType getTimestampType() { return timestampType; }
+ public TimestampType getTimestampType() {
+ return TimestampType.PHYSICAL;
+ }
@Override
- public ClockType getClockType() { return clockType; }
+ public ClockType getClockType() {
+ return ClockType.SYSTEM_MONOTONIC;
+ }
}
- class HLC implements Clock, PhysicalClock {
- private final PhysicalClock physicalClock;
- private final long maxClockSkew;
- private final long maxPhysicalTime;
- private final long maxLogicalTime;
- private long physicalTime;
- private long logicalTime;
- private final ClockType clockType = ClockType.HLC;
- private final TimestampType timestampType = TimestampType.HYBRID;
-
- public HLC(PhysicalClock physicalClock, long maxClockSkew) {
- this.physicalClock = physicalClock;
- this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW_IN_MS;
- this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
- this.maxLogicalTime = timestampType.getMaxLogicalTime();
- this.physicalTime = 0;
- this.logicalTime = 0;
+ /**
+ * HLC clock implementation.
+ * Monotonicity guarantee of physical component of time comes from {@link SystemMonotonic} clock.
+ */
+ class HLC implements Clock {
+ private static final TimestampType TIMESTAMP_TYPE = TimestampType.HYBRID;
+ private static final long MAX_PHYSICAL_TIME = TIMESTAMP_TYPE.getMaxPhysicalTime();
+ private static final long MAX_LOGICAL_TIME = TIMESTAMP_TYPE.getMaxLogicalTime();
+ private final Clock systemMonotonicClock;
+ private long currentPhysicalTime = 0;
+ private long currentLogicalTime = 0;
+
+ public HLC(long maxClockSkewInMs) {
+ this(new SystemMonotonic(maxClockSkewInMs));
}
+ @VisibleForTesting
public HLC() {
- this.physicalClock = DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK;
- this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW_IN_MS;
- this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
- this.maxLogicalTime = timestampType.getMaxLogicalTime();
- this.physicalTime = 0;
- this.logicalTime = 0;
+ this(DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ }
+
+ /**
+ * @param systemMonotonicClock Clock to get physical component of time. Should be monotonic
+ * clock.
+ */
+ @VisibleForTesting
+ public HLC(Clock systemMonotonicClock) {
+ assert(systemMonotonicClock.isMonotonic());
+ this.systemMonotonicClock = systemMonotonicClock;
}
@Override
public synchronized long now() throws ClockException {
- final long systemTime = physicalClock.now();
-
- checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
- checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
-
- if (systemTime <= physicalTime) {
- logicalTime++;
- } else if (systemTime > physicalTime) {
- logicalTime = 0;
- physicalTime = systemTime;
+ final long newSystemTime = systemMonotonicClock.now();
+ if (newSystemTime <= currentPhysicalTime) {
+ currentLogicalTime++;
+ } else if (newSystemTime > currentPhysicalTime) {
+ currentLogicalTime = 0;
+ currentPhysicalTime = newSystemTime;
}
+ checkPhysicalTimeOverflow(newSystemTime, MAX_PHYSICAL_TIME);
+ checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME);
return toTimestamp();
}
/**
- * Updates {@link HLC} with the given timestamp received from elsewhere (possibly
- * some other node). Returned timestamp is strict greater than msgTimestamp and local
- * timestamp.
+ * Updates {@link HLC} with the given time received from elsewhere (possibly some other node).
*
- * @param timestamp timestamp from the external message.
- * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and
- * msgTimestamp
- * @throws ClockException
+ * @param targetTime time from the external message.
+ * @return a hybrid timestamp of HLC that is strict greater than given {@code targetTime} and
+ * previously returned times.
+ * @throws ClockException If timestamp exceeds max clock skew allowed.
*/
@Override
- public synchronized long update(long timestamp)
+ public synchronized long update(long targetTime)
throws ClockException {
- final long targetPhysicalTime = timestampType.getPhysicalTime(timestamp);
- final long targetLogicalTime = timestampType.getLogicalTime(timestamp);
- final long oldPhysicalTime = physicalTime;
- final long systemTime = physicalClock.now();
-
- physicalTime = Math.max(Math.max(oldPhysicalTime, targetPhysicalTime), systemTime);
- checkPhysicalTimeOverflow(systemTime, maxPhysicalTime);
-
- if (targetPhysicalTime - systemTime > maxClockSkew) {
- throw new ClockException("Received event with timestamp:" +
- timestampType.toString(timestamp) + " which is greater than allowed clock skew");
- }
- if (physicalTime == oldPhysicalTime && oldPhysicalTime == targetPhysicalTime) {
- logicalTime = Math.max(logicalTime, targetLogicalTime) + 1;
- } else if (physicalTime == targetPhysicalTime) {
- logicalTime = targetLogicalTime + 1;
- } else if (physicalTime == oldPhysicalTime) {
- logicalTime++;
+ final long targetPhysicalTime = TIMESTAMP_TYPE.getPhysicalTime(targetTime);
+ final long targetLogicalTime = TIMESTAMP_TYPE.getLogicalTime(targetTime);
+ final long oldPhysicalTime = currentPhysicalTime;
+ currentPhysicalTime = systemMonotonicClock.update(targetPhysicalTime);
+
+ checkPhysicalTimeOverflow(currentPhysicalTime, MAX_PHYSICAL_TIME);
+
+ if (currentPhysicalTime == targetPhysicalTime && currentPhysicalTime == oldPhysicalTime) {
+ currentLogicalTime = Math.max(currentLogicalTime, targetLogicalTime) + 1;
+ } else if (currentPhysicalTime == targetPhysicalTime) {
+ currentLogicalTime = targetLogicalTime + 1;
+ } else if (currentPhysicalTime == oldPhysicalTime) {
+ currentLogicalTime++;
} else {
- logicalTime = 0;
+ currentLogicalTime = 0;
}
- checkLogicalTimeOverflow(logicalTime, maxLogicalTime);
+ checkLogicalTimeOverflow(currentLogicalTime, MAX_LOGICAL_TIME);
return toTimestamp();
}
@@ -342,40 +304,30 @@ public interface Clock {
return true;
}
- @Override
- public boolean isMonotonicallyIncreasing() {
- return true;
- }
-
public TimeUnit getTimeUnit() {
- return physicalClock.getTimeUnit();
+ return systemMonotonicClock.getTimeUnit();
}
private long toTimestamp() {
- return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime);
- }
-
- @VisibleForTesting
- synchronized void setLogicalTime(long logicalTime) {
- this.logicalTime = logicalTime;
+ return TIMESTAMP_TYPE.toTimestamp(TimeUnit.MILLISECONDS, currentPhysicalTime,
+ currentLogicalTime);
}
@VisibleForTesting
- synchronized void setPhysicalTime(long physicalTime) {
- this.physicalTime = physicalTime;
- }
-
- @VisibleForTesting
- synchronized long getLogicalTime() { return logicalTime; }
+ synchronized long getLogicalTime() { return currentLogicalTime; }
@VisibleForTesting
- synchronized long getPhysicalTime() { return physicalTime; }
+ synchronized long getPhysicalTime() { return currentPhysicalTime; }
@Override
- public TimestampType getTimestampType() { return timestampType; }
+ public TimestampType getTimestampType() {
+ return TIMESTAMP_TYPE;
+ }
@Override
- public ClockType getClockType() { return clockType; }
+ public ClockType getClockType() {
+ return ClockType.HLC;
+ }
}
//////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
index 35391ee..cc32263 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AtomicUtils.java
@@ -46,6 +46,7 @@ public class AtomicUtils {
/**
* Updates a AtomicLong which is supposed to maintain the maximum values. This method is not
* synchronized but is thread-safe.
+ * @return true if {@code max} was updated with {@code value}, else false.
*/
public static void updateMax(AtomicLong max, long value) {
while (true) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
index 2476c51..f9ad8e4 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.*;
@Category(SmallTests.class)
public class TestClock {
- static final Clock.PhysicalClock PHYSICAL_CLOCK = mock(Clock.PhysicalClock.class);
+ static final Clock MOCK_CLOCK = mock(Clock.class);
static final long MAX_CLOCK_SKEW_IN_MS = 1000;
@Rule
@@ -84,7 +84,7 @@ public class TestClock {
@Before
public void setUp() {
- when(PHYSICAL_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ when(MOCK_CLOCK.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
}
// All Clocks Tests
@@ -92,40 +92,47 @@ public class TestClock {
@Test
public void testSystemMonotonicNow() {
MonotonicityCheckerClock systemMonotonic =
- new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false);
+ new MonotonicityCheckerClock(new Clock.SystemMonotonic(MOCK_CLOCK), false);
// case 1: Set time and assert
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertEquals(100, systemMonotonic.now());
// case 2: Go back in time and check monotonic property.
- when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+ when(MOCK_CLOCK.now()).thenReturn(99L);
assertEquals(100, systemMonotonic.now());
// case 3: system time goes ahead compared to previous timestamp.
- when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+ when(MOCK_CLOCK.now()).thenReturn(101L);
assertEquals(101, systemMonotonic.now());
systemMonotonic.assertMonotonic();
}
+ /**
+ * Tests that
+ * - Progressing mock clock progresses SystemMonotonic clock.
+ * - Skews in the clock are correctly updated/not changed on call to update(), depending on
+ * target time and clock's own time ( = system time + current skew).
+ */
@Test
public void testSystemMonotonicUpdate() {
+ Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic(MOCK_CLOCK);
MonotonicityCheckerClock systemMonotonic =
- new MonotonicityCheckerClock(new Clock.SystemMonotonic(PHYSICAL_CLOCK, 30000), false);
+ new MonotonicityCheckerClock(systemMonotonicClock, false);
// Sets internal time
- when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+ when(MOCK_CLOCK.now()).thenReturn(99L);
assertEquals(99, systemMonotonic.now());
// case 1: Message timestamp is greater than current System Monotonic Time,
// physical time at 100 still.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertEquals(102, systemMonotonic.update(102));
// case 2: Message timestamp is greater than current System Monotonic Time,
// physical time at 100 still.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertEquals(103, systemMonotonic.update(103));
// case 3: Message timestamp is less than current System Monotonic Time, greater than current
@@ -137,11 +144,11 @@ public class TestClock {
assertEquals(103, systemMonotonic.update(99));
// case 5: Message timestamp<System monotonic time and both less than current Physical Time
- when(PHYSICAL_CLOCK.now()).thenReturn(106L);
+ when(MOCK_CLOCK.now()).thenReturn(106L);
assertEquals(106, systemMonotonic.update(102));
// case 6: Message timestamp>System monotonic time and both less than current Physical Time
- when(PHYSICAL_CLOCK.now()).thenReturn(109L);
+ when(MOCK_CLOCK.now()).thenReturn(109L);
assertEquals(109, systemMonotonic.update(108));
systemMonotonic.assertMonotonic();
@@ -151,10 +158,10 @@ public class TestClock {
public void testSystemMonotonicUpdateMaxClockSkew() throws Clock.ClockException {
final long time = 100L;
Clock.SystemMonotonic systemMonotonic =
- new Clock.SystemMonotonic(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS);
+ new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS);
// Set Current Time.
- when(PHYSICAL_CLOCK.now()).thenReturn(time);
+ when(MOCK_CLOCK.now()).thenReturn(time);
systemMonotonic.now();
// Shouldn't throw ClockException
@@ -173,28 +180,29 @@ public class TestClock {
@Test
public void testHLCNow() throws Clock.ClockException {
- MonotonicityCheckerClock hybridLogicalClock =
- new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 30000), true);
+ MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock(
+ new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)),
+ true); // true for strict monotonicity
// case 1: Test if it returns correct time based on current physical time.
// Remember, initially logical time = 0
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertHLCTime(hybridLogicalClock.now(), 100, 0);
// case 2: physical time doesn't change, logical time should increment.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertHLCTime(hybridLogicalClock.now(), 100, 1);
// case 3: physical time doesn't change still, logical time should increment again
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
assertHLCTime(hybridLogicalClock.now(), 100, 2);
// case 4: physical time moves forward, logical time should reset to 0.
- when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+ when(MOCK_CLOCK.now()).thenReturn(101L);
assertHLCTime(hybridLogicalClock.now(), 101, 0);
// case 5: Monotonic increasing check, physical time goes back.
- when(PHYSICAL_CLOCK.now()).thenReturn(99L);
+ when(MOCK_CLOCK.now()).thenReturn(99L);
assertHLCTime(hybridLogicalClock.now(), 101, 1);
hybridLogicalClock.assertMonotonic();
@@ -202,55 +210,57 @@ public class TestClock {
@Test
public void testHLCNowLogicalTimeOverFlow() {
- Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
-
- // Set Current Time.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
- hybridLogicalClock.setPhysicalTime(100);
- hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime());
+ Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK));
+ when(MOCK_CLOCK.now()).thenReturn(100L);
+ hybridLogicalClock.now(); // current time (100, 0)
+ for (int i = 0; i < TimestampType.HYBRID.getMaxLogicalTime() - 1; i++) {
+ hybridLogicalClock.now();
+ }
exception.expect(Clock.ClockException.class);
hybridLogicalClock.now();
}
+ // No need to check skews in this test, since they are member of SystemMonotonic and not HLC.
@Test
public void testHLCUpdate() throws Clock.ClockException {
long messageTimestamp;
- MonotonicityCheckerClock hybridLogicalClock =
- new MonotonicityCheckerClock(new Clock.HLC(PHYSICAL_CLOCK, 100), true);
+ MonotonicityCheckerClock hybridLogicalClock = new MonotonicityCheckerClock(
+ new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK)),
+ true); // true for strictly increasing check
// Set Current Time.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
hybridLogicalClock.now();
// case 1: Message physical timestamp is lower than current physical time.
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 99, 1);
- when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+ when(MOCK_CLOCK.now()).thenReturn(101L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 101, 0);
// case 2: Message physical timestamp is greater than HLC physical time.
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 3);
- when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+ when(MOCK_CLOCK.now()).thenReturn(102L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 4);
// case 3: Message timestamp is less than HLC timestamp
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 104 , 4);
- when(PHYSICAL_CLOCK.now()).thenReturn(103L);
+ when(MOCK_CLOCK.now()).thenReturn(103L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 5);
//case 4: Message timestamp with same physical time as HLC, but lower logical time
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 2);
- when(PHYSICAL_CLOCK.now()).thenReturn(101L);
+ when(MOCK_CLOCK.now()).thenReturn(101L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 6);
//case 5: Message timestamp with same physical time as HLC, but higher logical time
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 8);
- when(PHYSICAL_CLOCK.now()).thenReturn(102L);
+ when(MOCK_CLOCK.now()).thenReturn(102L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 105, 9);
//case 6: Actual Physical Time greater than message physical timestamp and HLC physical time.
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 10);
- when(PHYSICAL_CLOCK.now()).thenReturn(110L);
+ when(MOCK_CLOCK.now()).thenReturn(110L);
assertHLCTime(hybridLogicalClock.update(messageTimestamp), 110, 0);
hybridLogicalClock.assertMonotonic();
@@ -259,10 +269,10 @@ public class TestClock {
@Test
public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException {
long messageTimestamp;
- Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, 100);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(new Clock.SystemMonotonic(MOCK_CLOCK));
// Set Current Time.
- when(PHYSICAL_CLOCK.now()).thenReturn(100L);
+ when(MOCK_CLOCK.now()).thenReturn(100L);
hybridLogicalClock.now();
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 100,
@@ -275,10 +285,11 @@ public class TestClock {
public void testHLCUpdateMaxClockSkew() throws Clock.ClockException {
final long time = 100;
long messageTimestamp;
- Clock.HLC hybridLogicalClock = new Clock.HLC(PHYSICAL_CLOCK, MAX_CLOCK_SKEW_IN_MS);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(
+ new Clock.SystemMonotonic(MOCK_CLOCK, MAX_CLOCK_SKEW_IN_MS));
// Set Current Time.
- when(PHYSICAL_CLOCK.now()).thenReturn(time);
+ when(MOCK_CLOCK.now()).thenReturn(time);
hybridLogicalClock.now();
messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d8d87f8..7239a29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -598,8 +598,10 @@ public class HRegionServer extends HasThread implements
this.abortRequested = false;
this.stopped = false;
- this.hybridLogicalClock = new Clock.HLC();
- this.systemMonotonicClock = new Clock.SystemMonotonic();
+ final long maxClockSkew =
+ conf.getLong("hbase.max.clock.skew.in.ms", Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ this.hybridLogicalClock = new Clock.HLC(maxClockSkew);
+ this.systemMonotonicClock = new Clock.SystemMonotonic(maxClockSkew);
this.systemClock = new Clock.System();
rpcServices = createRpcServices();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d9a99049/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
index bc95f46..ea46090 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClockWithCluster.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -198,18 +199,19 @@ public class TestClockWithCluster {
assertNotNull(regionMeta);
// Inject physical clock that always returns same physical time into hybrid logical clock
- long systemTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now();
- Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
- when(physicalClock.now()).thenReturn(systemTime);
- when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
- Clock.HLC clock = new Clock.HLC(physicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ long systemTime = Clock.SYSTEM_CLOCK.now();
+ Clock mockSystemClock = mock(Clock.class);
+ when(mockSystemClock.now()).thenReturn(systemTime);
+ when(mockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ when(mockSystemClock.isMonotonic()).thenReturn(true);
+ Clock.HLC masterHLC = new Clock.HLC(new Clock.SystemMonotonic(mockSystemClock));
// The region clock is used for setting timestamps for table mutations and the region server
// clock is used for updating the clock on region assign/unassign events.
// Set meta region clock so that region state transitions are timestamped with mocked clock
- regionMeta.setClock(clock);
- master.setClock(clock);
+ regionMeta.setClock(masterHLC);
+ master.setClock(masterHLC);
HRegion userRegion = null;
for (Region region : regions) {
@@ -221,13 +223,13 @@ public class TestClockWithCluster {
// Only mock the region server clock because the region clock does not get used during
// unassignment and assignment
- rs.setClock(clock);
+ rs.setClock(masterHLC);
// Repeatedly unassign and assign region while tracking the timestamps of the region state
// transitions from the meta table
List<Long> timestamps = new ArrayList<>();
// Set expected logical time to 0 as initial clock.now() sets clock's logical time to 0
- long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(clock.now());
+ long expectedLogicalTime = TimestampType.HYBRID.getLogicalTime(masterHLC.now());
for (int i = 0; i < 10; i++) {
admin.unassign(hriOnline.getRegionName(), false);
assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
@@ -243,8 +245,8 @@ public class TestClockWithCluster {
// 8,9 [now] Update hbase:meta
expectedLogicalTime += 10;
- assertEquals(expectedLogicalTime, clock.getLogicalTime());
- timestamps.add(clock.getLogicalTime());
+ assertEquals(expectedLogicalTime, masterHLC.getLogicalTime());
+ timestamps.add(masterHLC.getLogicalTime());
admin.assign(hriOnline.getRegionName());
// clock.now() is called 7 times and clock.update() is called 2 times, each call increments
@@ -260,8 +262,8 @@ public class TestClockWithCluster {
// gets the region info from assignment manager rather than meta table accessor
expectedLogicalTime += 9;
assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
- assertEquals(expectedLogicalTime, clock.getLogicalTime());
- timestamps.add(clock.getLogicalTime());
+ assertEquals(expectedLogicalTime, masterHLC.getLogicalTime());
+ timestamps.add(masterHLC.getLogicalTime());
}
// Ensure that the hybrid timestamps are strictly increasing
@@ -305,62 +307,63 @@ public class TestClockWithCluster {
assertNotNull(regionMeta);
// Instantiate two hybrid logical clocks with mocked physical clocks
- long expectedPhysicalTime = Clock.DEFAULT_JAVA_MILLIS_PHYSICAL_CLOCK.now();
- Clock.PhysicalClock masterPhysicalClock = mock(Clock.PhysicalClock.class);
- when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
- when(masterPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
- Clock.HLC masterClock = new Clock.HLC(masterPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
- master.setClock(masterClock);
- regionMeta.setClock(masterClock);
-
- Clock.PhysicalClock rsPhysicalClock = mock(Clock.PhysicalClock.class);
- when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
- when(rsPhysicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
- Clock.HLC rsClock = new Clock.HLC(rsPhysicalClock, Clock.DEFAULT_MAX_CLOCK_SKEW_IN_MS);
+ long expectedPhysicalTime = Clock.SYSTEM_CLOCK.now();
+ Clock masterMockSystemClock = mock(Clock.class);
+ when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
+ when(masterMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ Clock.HLC masterHLC= new Clock.HLC(new Clock.SystemMonotonic(masterMockSystemClock));
+ master.setClock(masterHLC);
+ regionMeta.setClock(masterHLC);
+
+ Clock rsMockSystemClock = mock(Clock.class);
+ when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
+ when(rsMockSystemClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ when(rsMockSystemClock.isMonotonic()).thenReturn(true);
+ Clock.HLC rsHLC= new Clock.HLC(new Clock.SystemMonotonic(rsMockSystemClock));
// We only mock the region server clock here because the region clock does not get used
// during unassignment and assignment
- rs.setClock(rsClock);
+ rs.setClock(rsHLC);
// Increment master physical clock time
expectedPhysicalTime += 1000;
- when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+ when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
// Unassign region, region server should advance its clock upon receiving close region request
admin.unassign(hriOnline.getRegionName(), false);
assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
// Verify that region server clock time increased
// Previous test has explanation for each event that increases logical time
- assertHLCTime(masterClock, expectedPhysicalTime, 9);
- assertHLCTime(rsClock, expectedPhysicalTime, 6);
+ assertHLCTime(masterHLC, expectedPhysicalTime, 9);
+ assertHLCTime(rsHLC, expectedPhysicalTime, 6);
// Increase region server physical clock time
expectedPhysicalTime += 1000;
- when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+ when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
// Assign region, master server should advance its clock upon receiving close region response
admin.assign(hriOnline.getRegionName());
assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
// Verify that master clock time increased
- assertHLCTime(masterClock, expectedPhysicalTime, 4);
- assertHLCTime(rsClock, expectedPhysicalTime, 1);
+ assertHLCTime(masterHLC, expectedPhysicalTime, 4);
+ assertHLCTime(rsHLC, expectedPhysicalTime, 1);
// Increment region server physical clock time
expectedPhysicalTime += 1000;
- when(rsPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+ when(rsMockSystemClock.now()).thenReturn(expectedPhysicalTime);
// Unassign region, region server should advance its clock upon receiving close region request
admin.unassign(hriOnline.getRegionName(), false);
assertEquals(RegionState.State.CLOSED, regionStates.getRegionState(hriOnline).getState());
// Verify that master server clock time increased
- assertHLCTime(masterClock, expectedPhysicalTime, 4);
- assertHLCTime(rsClock, expectedPhysicalTime, 1);
+ assertHLCTime(masterHLC, expectedPhysicalTime, 4);
+ assertHLCTime(rsHLC, expectedPhysicalTime, 1);
// Increase master server physical clock time
expectedPhysicalTime += 1000;
- when(masterPhysicalClock.now()).thenReturn(expectedPhysicalTime);
+ when(masterMockSystemClock.now()).thenReturn(expectedPhysicalTime);
// Assign region, master server should advance its clock upon receiving close region response
admin.assign(hriOnline.getRegionName());
assertEquals(RegionState.State.OPEN, regionStates.getRegionState(hriOnline).getState());
// Verify that region server clock time increased
- assertHLCTime(masterClock, expectedPhysicalTime, 8);
- assertHLCTime(rsClock, expectedPhysicalTime, 5);
+ assertHLCTime(masterHLC, expectedPhysicalTime, 8);
+ assertHLCTime(rsHLC, expectedPhysicalTime, 5);
}
}
\ No newline at end of file