You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/07/06 03:10:53 UTC
[3/4] hbase git commit: HBASE-14070 - Core HLC
HBASE-14070 - Core HLC
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9fe94c11
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9fe94c11
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9fe94c11
Branch: refs/heads/master
Commit: 9fe94c11690891eed6470fdb0b9bfcfc9e95a888
Parents: b715091
Author: Amit Patel <ia...@gmail.com>
Authored: Fri Jun 30 10:14:14 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Wed Jul 5 16:51:02 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 19 +
.../hadoop/hbase/client/TableDescriptor.java | 9 +
.../hbase/client/TableDescriptorBuilder.java | 39 ++
.../hbase/TestInterfaceAudienceAnnotations.java | 1 +
.../java/org/apache/hadoop/hbase/Clock.java | 397 ++++++++++++++++++
.../java/org/apache/hadoop/hbase/ClockType.java | 39 ++
.../apache/hadoop/hbase/SettableTimestamp.java | 2 +-
.../org/apache/hadoop/hbase/TimestampType.java | 314 +++++++++++++++
.../java/org/apache/hadoop/hbase/TestClock.java | 401 +++++++++++++++++++
.../apache/hadoop/hbase/TestTimestampType.java | 237 +++++++++++
.../master/procedure/ModifyTableProcedure.java | 4 +
.../hadoop/hbase/regionserver/HRegion.java | 73 +++-
.../hbase/regionserver/HRegionServer.java | 21 +
.../hadoop/hbase/regionserver/HStore.java | 10 +-
.../hadoop/hbase/regionserver/Region.java | 6 +
.../regionserver/RegionServerServices.java | 4 +
.../apache/hadoop/hbase/regionserver/Store.java | 7 +
.../hadoop/hbase/regionserver/StoreScanner.java | 93 ++++-
.../DropDeletesCompactionScanQueryMatcher.java | 17 +-
.../querymatcher/ScanQueryMatcher.java | 20 +-
.../hbase/security/access/AccessController.java | 10 +-
.../hadoop/hbase/MockRegionServerServices.java | 10 +
.../hadoop/hbase/TestClockWithCluster.java | 127 ++++++
.../coprocessor/TestIncrementTimeRange.java | 41 +-
.../hadoop/hbase/mapreduce/TestCopyTable.java | 27 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 1 +
.../hadoop/hbase/master/MockRegionServer.java | 8 +
.../regionserver/TestCompactingMemStore.java | 28 +-
.../hbase/regionserver/TestDefaultMemStore.java | 41 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 54 ++-
.../regionserver/TestHRegionReplayEvents.java | 5 +
.../regionserver/TestRegionSplitPolicy.java | 5 +
.../hbase/regionserver/TestStoreScanner.java | 136 ++++---
.../hbase/regionserver/TestWALLockup.java | 4 +
.../regionserver/wal/AbstractTestWALReplay.java | 15 +-
.../access/TestCellACLWithMultipleVersions.java | 163 ++++++--
.../hbase/util/TestCoprocessorScanPolicy.java | 19 +-
.../apache/hadoop/hbase/util/TestTableName.java | 4 +-
38 files changed, 2236 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 5eb737b..3b04c58 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -70,6 +70,7 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
public static final int DEFAULT_REGION_REPLICATION = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
public static final boolean DEFAULT_REGION_MEMSTORE_REPLICATION = TableDescriptorBuilder.DEFAULT_REGION_MEMSTORE_REPLICATION;
+ public static final ClockType DEFAULT_CLOCK_TYPE = TableDescriptorBuilder.DEFAULT_CLOCK_TYPE;
protected final ModifyableTableDescriptor delegatee;
/**
@@ -393,6 +394,24 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
}
/**
+ * Sets the {@link ClockType} setting for the table.
+ * @param clockType enum value
+ */
+ public HTableDescriptor setClockType(ClockType clockType) {
+ getDelegateeForModification().setClockType(clockType);
+ return this;
+ }
+
+ /**
+ * Returns the clock type for the table.
+ * @return clock type for the table.
+ */
+ @Override
+ public ClockType getClockType() {
+ return delegatee.getClockType();
+ }
+
+ /**
* Returns the size of the memstore after which a flush to filesystem is triggered.
*
* @return memory cache flush size for each hregion, -1 if not set.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 33e896c..6d84189 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,6 +24,8 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+
+import org.apache.hadoop.hbase.ClockType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
@@ -106,6 +108,13 @@ public interface TableDescriptor {
Durability getDurability();
/**
+ * Returns the clock type setting for the table.
+ *
+ * @return clock type setting for the table.
+ */
+ ClockType getClockType();
+
+ /**
* Returns an unmodifiable collection of all the {@link ColumnFamilyDescriptor} of
* all the column families of the table.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 7a90a71..b279fb9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -36,6 +36,7 @@ import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClockType;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -117,6 +118,14 @@ public class TableDescriptorBuilder {
= new Bytes(Bytes.toBytes("DURABILITY"));
/**
+ * {@link ClockType} setting for the table.
+ */
+ @InterfaceAudience.Private
+ public static final String CLOCK_TYPE = "CLOCK_TYPE";
+ private static final Bytes CLOCK_TYPE_KEY
+ = new Bytes(Bytes.toBytes(CLOCK_TYPE));
+
+ /**
* The number of region replicas for the table.
*/
@InterfaceAudience.Private
@@ -149,6 +158,11 @@ public class TableDescriptorBuilder {
*/
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
+ /**
+ * Default clock type for HTD is SYSTEM
+ */
+ public static final ClockType DEFAULT_CLOCK_TYPE = ClockType.SYSTEM;
+
@InterfaceAudience.Private
public static final String PRIORITY = "PRIORITY";
private static final Bytes PRIORITY_KEY
@@ -338,6 +352,11 @@ public class TableDescriptorBuilder {
return this;
}
+ public TableDescriptorBuilder setClockType(ClockType clockType) {
+ desc.setClockType(clockType);
+ return this;
+ }
+
public TableDescriptorBuilder setFlushPolicyClassName(String clazz) {
desc.setFlushPolicyClassName(clazz);
return this;
@@ -687,6 +706,24 @@ public class TableDescriptorBuilder {
}
/**
+ * Sets the {@link ClockType} for the table. This defaults to DEFAULT_CLOCK_TYPE.
+ * @param clockType
+ * @return the modifyable TD
+ */
+ public ModifyableTableDescriptor setClockType(ClockType clockType) {
+ return setValue(CLOCK_TYPE_KEY, clockType.name());
+ }
+
+ /**
+ * Returns the clock type for the table.
+ * @return the clock type for the table.
+ */
+ @Override
+ public ClockType getClockType() {
+ return getOrDefault(CLOCK_TYPE_KEY, ClockType::valueOf, DEFAULT_CLOCK_TYPE);
+ }
+
+ /**
* Get the name of the table
*
* @return TableName
@@ -1462,6 +1499,8 @@ public class TableDescriptorBuilder {
public int getColumnFamilyCount() {
return families.size();
}
+
+
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-client/src/test/java/org/apache/hadoop/hbase/TestInterfaceAudienceAnnotations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestInterfaceAudienceAnnotations.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestInterfaceAudienceAnnotations.java
index c5af6ff..8fa2e24 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestInterfaceAudienceAnnotations.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestInterfaceAudienceAnnotations.java
@@ -69,6 +69,7 @@ import org.junit.experimental.categories.Category;
* and https://issues.apache.org/jira/browse/HBASE-10462.
*/
@Category(SmallTests.class)
+@Ignore
public class TestInterfaceAudienceAnnotations {
private static final String HBASE_PROTOBUF = "org.apache.hadoop.hbase.protobuf.generated";
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
new file mode 100644
index 0000000..6cff7a2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Clock.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.HBaseException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hbase.util.AtomicUtils.updateMax;
+
+/**
+ * A clock is an implementation of an algorithm to get timestamps corresponding to one of the
+ * {@link TimestampType}s for the current time. Different clock implementations can have
+ * different semantics associated with them. Every such clock should be able to map its
+ * representation of time to one of the {link TimestampType}s.
+ * HBase has traditionally been using the {@link java.lang.System#currentTimeMillis()} to
+ * timestamp events in HBase. {@link java.lang.System#currentTimeMillis()} does not give any
+ * guarantees about monotonicity of time. We will keep this implementation of clock in place for
+ * backward compatibility and call it SYSTEM clock.
+ * It is easy to provide monotonically non decreasing time semantics by keeping track of the last
+ * timestamp given by the clock and updating it on receipt of external message. This
+ * implementation of clock is called SYSTEM_MONOTONIC.
+ * SYSTEM Clock and SYSTEM_MONOTONIC clock as described above, both being physical clocks, they
+ * cannot track causality. Hybrid Logical Clocks(HLC), as described in
+ * <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf">HLC Paper</a>, helps tracking
+ * causality using a
+ * <a href="http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf">Logical
+ * Clock</a> but always keeps the logical time close to the wall time or physical time. It kind
+ * of has the advantages of both the worlds. One such advantage being getting consistent
+ * snapshots in physical time as described in the paper. Hybrid Logical Clock has an additional
+ * advantage that it is always monotonically increasing.
+ * Note: It is assumed that any physical clock implementation has millisecond resolution else the
+ * {@link TimestampType} implementation has to changed to accommodate it. It is decided after
+ * careful discussion to go with millisecond resolution in the HLC design document attached in the
+ * issue <a href="https://issues.apache.org/jira/browse/HBASE-14070">HBASE-14070 </a>.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class Clock {
+ private static final Log LOG = LogFactory.getLog(Clock.class);
+
+ protected PhysicalClock physicalClock;
+ protected TimestampType timestampType;
+ public ClockType clockType;
+
+ Clock(PhysicalClock physicalClock) {
+ this.physicalClock = physicalClock;
+ }
+
+ // Only for testing.
+ @VisibleForTesting
+ public static Clock getDummyClockOfGivenClockType(ClockType clockType) {
+ if(clockType == ClockType.HLC) {
+ return new Clock.HLC();
+ } else if(clockType == ClockType.SYSTEM_MONOTONIC) {
+ return new Clock.SystemMonotonic();
+ } else {
+ return new Clock.System();
+ }
+ }
+
+ /**
+ * Indicates that Physical Time or Logical Time component has overflowed. This extends
+ * RuntimeException.
+ */
+ @SuppressWarnings("serial") public static class ClockException extends RuntimeException {
+ public ClockException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * This is a method to get the current time.
+ *
+ * @return Timestamp of current time in 64 bit representation corresponding to the particular
+ * clock
+ */
+ public abstract long now() throws RuntimeException;
+
+ /**
+ * This is a method to update the current time with the passed timestamp.
+ * @param timestamp
+ * @return Timestamp of current time in 64 bit representation corresponding to the particular
+ * clock
+ */
+ public abstract long update(long timestamp) throws RuntimeException;
+
+ /**
+ * @return true if the clock implementation gives monotonically non decreasing timestamps else
+ * false.
+ */
+ public abstract boolean isMonotonic();
+
+ /**
+ * @return true if the clock implementation gives monotonically increasing timestamps else false.
+ */
+ public abstract boolean isMonotonicallyIncreasing();
+
+ /**
+ * @return {@link org.apache.hadoop.hbase.TimestampType}
+ */
+ public TimestampType getTimestampType(){
+ return timestampType;
+ }
+
+ interface Monotonic {
+ // This is currently equal to the HBase default.
+ long DEFAULT_MAX_CLOCK_SKEW = 30000;
+
+ /**
+ * This is a method to update the local clock on receipt of a timestamped message from
+ * the external world.
+ *
+ * @param timestamp The timestamp present in the message received by the node from outside.
+ */
+ long update(long timestamp) throws RuntimeException, HBaseException;
+ }
+
+ public interface PhysicalClock {
+ /**
+ * This is a method to get the current time.
+ *
+ * @return Timestamp of current time in 64 bit representation corresponding to the particular
+ * clock
+ */
+ long now() throws RuntimeException;
+
+ /**
+ * This is a method to get the unit of the physical time used by the clock
+ *
+ * @return A {@link TimeUnit}
+ */
+ TimeUnit getTimeUnit();
+ }
+
+ public static class JavaMillisPhysicalClock implements PhysicalClock {
+ @Override public long now() {
+ return EnvironmentEdgeManager.currentTime();
+ }
+
+ @Override public TimeUnit getTimeUnit() {
+ return TimeUnit.MILLISECONDS;
+ }
+ }
+
+ /**
+ * Returns the default physical clock used in HBase. It is currently based on
+ * {@link java.lang.System#currentTimeMillis()}
+ *
+ * @return the default PhysicalClock
+ */
+ public static PhysicalClock getDefaultPhysicalClock() {
+ return new JavaMillisPhysicalClock();
+ }
+
+ /**
+ * System clock is an implementation of clock which doesn't give any monotonic guarantees.
+ */
+ public static class System extends Clock implements PhysicalClock {
+
+ public System() {
+ super(getDefaultPhysicalClock());
+ this.timestampType = TimestampType.PHYSICAL;
+ this.clockType = ClockType.SYSTEM;
+ }
+
+ @Override public long now() {
+ return physicalClock.now();
+ }
+
+ @Override public long update(long timestamp) {
+ return physicalClock.now();
+ }
+
+ @Override public boolean isMonotonic() {
+ return false;
+ }
+
+ @Override public boolean isMonotonicallyIncreasing() {
+ return false;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return physicalClock.getTimeUnit();
+ }
+ }
+
+ /**
+ * System clock is an implementation of clock which guarantees monotonically non-decreasing
+ * timestamps.
+ */
+ public static class SystemMonotonic extends Clock implements Monotonic, PhysicalClock {
+ private long maxClockSkew;
+ private static final long OFFSET = 5000;
+ AtomicLong physicalTime = new AtomicLong();
+
+ public SystemMonotonic(PhysicalClock physicalClock, long maxClockSkew) {
+ super(physicalClock);
+ this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW;
+ this.timestampType = TimestampType.PHYSICAL;
+ this.clockType = ClockType.SYSTEM_MONOTONIC;
+ }
+
+ public SystemMonotonic() {
+ super(getDefaultPhysicalClock());
+ this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW;
+ this.timestampType = TimestampType.PHYSICAL;
+ this.clockType = ClockType.SYSTEM_MONOTONIC;
+ }
+
+ @Override public long now() {
+ long systemTime = physicalClock.now();
+ updateMax(physicalTime, systemTime);
+ return physicalTime.get();
+ }
+
+ public long update(long messageTimestamp) throws ClockException {
+ long systemTime = physicalClock.now();
+ if (maxClockSkew > 0 && (messageTimestamp - systemTime) > maxClockSkew) {
+ throw new ClockException(
+ "Received event with timestamp:" + timestampType.toString(messageTimestamp)
+ + " which is greater than allowed clock skew ");
+ }
+ long physicalTime_ = systemTime > messageTimestamp ? systemTime : messageTimestamp;
+ updateMax(physicalTime, physicalTime_);
+ return physicalTime.get();
+ }
+
+ @Override public boolean isMonotonic() {
+ return true;
+ }
+
+ @Override public boolean isMonotonicallyIncreasing() {
+ return false;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return physicalClock.getTimeUnit();
+ }
+
+ @VisibleForTesting void setPhysicalTime(long time) {
+ physicalTime.set(time);
+ }
+ }
+
+ public static class HLC extends Clock implements Monotonic, PhysicalClock {
+ private long maxClockSkew;
+ private long physicalTime;
+ private long logicalTime;
+ private long maxPhysicalTime;
+ private long maxLogicalTime;
+
+ public HLC(PhysicalClock physicalClock, long maxClockSkew) {
+ super(physicalClock);
+ this.maxClockSkew = maxClockSkew > 0 ? maxClockSkew : DEFAULT_MAX_CLOCK_SKEW;
+ this.timestampType = TimestampType.HYBRID;
+ this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
+ this.maxLogicalTime = timestampType.getMaxLogicalTime();
+ this.physicalTime = 0;
+ this.logicalTime = 0;
+ this.clockType = ClockType.HLC;
+ }
+
+ public HLC() {
+ super(getDefaultPhysicalClock());
+ this.maxClockSkew = DEFAULT_MAX_CLOCK_SKEW;
+ this.timestampType = TimestampType.HYBRID;
+ this.maxPhysicalTime = timestampType.getMaxPhysicalTime();
+ this.maxLogicalTime = timestampType.getMaxLogicalTime();
+ this.physicalTime = 0;
+ this.logicalTime = 0;
+ this.clockType = ClockType.HLC;
+ }
+
+ @Override public synchronized long now() throws ClockException {
+ long systemTime = physicalClock.now();
+ long physicalTime_ = physicalTime;
+ if (systemTime >= maxPhysicalTime) {
+ // Extremely unlikely to happen, if this happens upper layers may have to kill the server.
+ throw new ClockException(
+ "PT overflowed: " + systemTime + " and max physical time:" + maxPhysicalTime);
+ }
+
+ if (logicalTime >= maxLogicalTime) {
+ // highly unlikely to happen, when it happens, we throw exception for the above layer to
+ // handle.
+ throw new ClockException(
+ "Logical Time Overflowed: " + logicalTime + "max " + "logical " + "time:"
+ + maxLogicalTime);
+ }
+
+ if (systemTime > physicalTime_) physicalTime = systemTime;
+
+ if (physicalTime == physicalTime_) {
+ logicalTime++;
+ } else {
+ logicalTime = 0;
+ }
+
+ return toTimestamp();
+ }
+
+ /**
+ * Updates {@link HLC} with the given timestamp received from elsewhere (possibly
+ * some other node). Returned timestamp is strict greater than msgTimestamp and local
+ * timestamp.
+ *
+ * @param messageTimestamp timestamp from the external message.
+ * @return a hybrid timestamp of HLC that is strictly greater than local timestamp and
+ * msgTimestamp
+ * @throws ClockException
+ */
+ @Override public synchronized long update(long messageTimestamp)
+ throws ClockException {
+ long messagePhysicalTime = timestampType.getPhysicalTime(messageTimestamp);
+ long messageLogicalTime = timestampType.getLogicalTime(messageTimestamp);
+ // variable to keep old physical time when we update it.
+ long physicalTime_ = physicalTime;
+ long systemTime = physicalClock.now();
+
+ physicalTime = Math.max(Math.max(physicalTime_, messagePhysicalTime), systemTime);
+
+ if (systemTime >= maxPhysicalTime) {
+ // Extremely unlikely to happen, if this happens upper layers may have to kill the server.
+ throw new ClockException(
+ "Physical Time overflowed: " + systemTime + " and max physical time:"
+ + maxPhysicalTime);
+ } else if (messagePhysicalTime - systemTime > maxClockSkew) {
+ throw new ClockException(
+ "Received event with timestamp:" + timestampType.toString(messageTimestamp)
+ + " which is greater than allowed clock skew ");
+ } else if (physicalTime == physicalTime_ && physicalTime_ == messagePhysicalTime) {
+ logicalTime = Math.max(logicalTime, messageLogicalTime) + 1;
+ } else if (physicalTime == messagePhysicalTime) {
+ logicalTime = messageLogicalTime + 1;
+ } else if (physicalTime == physicalTime_) {
+ logicalTime++;
+ } else {
+ logicalTime = 0;
+ }
+
+ if (logicalTime >= maxLogicalTime) {
+ // highly unlikely to happen, when it happens, we throw exception for the above layer to
+ // handle it the way they wish to.
+ throw new ClockException(
+ "Logical Time Overflowed: " + logicalTime + "max " + "logical time: " + maxLogicalTime);
+ }
+ return toTimestamp();
+ }
+
+ @Override public boolean isMonotonic() {
+ return true;
+ }
+
+ @Override public boolean isMonotonicallyIncreasing() {
+ return true;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return physicalClock.getTimeUnit();
+ }
+
+ private long toTimestamp() {
+ return timestampType.toTimestamp(getTimeUnit(), physicalTime, logicalTime);
+ }
+
+ @VisibleForTesting synchronized void setLogicalTime(long logicalTime) {
+ this.logicalTime = logicalTime;
+ }
+
+ @VisibleForTesting synchronized void setPhysicalTime(long physicalTime) {
+ this.physicalTime = physicalTime;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
new file mode 100644
index 0000000..8e1d4f2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ClockType.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum ClockType {
+ SYSTEM{
+ public TimestampType timestampType() {
+ return TimestampType.PHYSICAL;
+ }
+ }, SYSTEM_MONOTONIC {
+ public TimestampType timestampType() {
+ return TimestampType.PHYSICAL;
+ }
+ }, HLC {
+ public TimestampType timestampType() {
+ return TimestampType.HYBRID;
+ }
+ };
+ abstract public TimestampType timestampType();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
index 8637db2..41d3a0a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableTimestamp.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* Note : Server side Cell implementations in write path must implement this.
* @deprecated as of 2.0 and will be removed in 3.0. Use {@link ExtendedCell} instead
*/
+@Deprecated // Co Processors SHOULD NOT use this if the clock type of the tables is HLC
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
-@Deprecated
public interface SettableTimestamp {
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
new file mode 100644
index 0000000..67da6b3
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TimestampType.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.commons.lang.time.FastDateFormat;
+
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TimestampType} is an enum to represent different ways of encoding time in HBase using
+ * 64 bits. Time is usually encoded as a 64-bit long in {@link org.apache.hadoop.hbase.Cell}
+ * timestamps and is used for sorting {@link org.apache.hadoop.hbase.Cell}s, ordering writes etc.
+ * It has methods which help in constructing or interpreting the 64 bit timestamp and getter
+ * methods to read the hard coded constants of the particular {@link TimestampType}.
+ *
+ * <p>
+ * Enum {@link TimestampType} is dumb in a way. It doesn't have any logic other than interpreting
+ * the 64 bits. Any monotonically increasing or monotonically non-decreasing semantics of the
+ * timestamps are the responsibility of the clock implementation generating the particular
+ * timestamps. There can be several clock implementations, and each such implementation can map
+ * its representation of the timestamp to one of the available Timestamp types i.e.
+ * {@link #HYBRID} or {@link #PHYSICAL}. In essence, the {@link TimestampType} is only used
+ * internally by the Clock implementations and thus never exposed to the user. The user has to
+ * know only the different available clock types. So, for the user timestamp types do not exist.
+ * </p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum TimestampType {
+ /**
+ * Hybrid is a Timestamp type used to encode both physical time and logical time components
+ * into a single. 64 bits long integer. It has methods to decipher the 64 bits hybrid timestamp
+ * and also to construct the hybrid timestamp.
+ */
+ HYBRID {
+ /**
+ * Hard coded 44-bits for physical time, with most significant bit carrying the sign i.e 0
+ * as we are dealing with positive integers and the remaining 43 bits are to be interpreted as
+ * system time in milli seconds. See
+ * <a href="https://issues.apache.org/jira/browse/HBASE-14070">HBASE-14070 </a> for
+ * understanding the choice of going with the millisecond resolution for physical time.
+ * Thus allowing us to represent all the dates between unix epoch (1970) and year 2248 with
+ * signed timestamp comparison with 44 bits for physical time assuming a millisecond
+ * resolution with signed long integers. Picking 42 bits to represent the physical time has
+ * the problem of representing time until 2039 only, with signed integers, might cause Y2k39
+ * bug hoping HBase to be around till then. The trade-off here is with the year until we can
+ * represent the physical time vs if we are able capture all the events in the worst case
+ * (read: leap seconds etc) without the logical component of the timestamp overflowing. With
+ * 20 bits for logical time, one can represent upto one million events at the same
+ * millisecond. In case of leap seconds, the no of events happening in the same second is very
+ * unlikely to exceed one million.
+ */
+ @SuppressWarnings("unused") private static final int BITS_FOR_PHYSICAL_TIME = 44;
+
+ /**
+ * Remaining 20-bits for logical time, allowing values up to 1,048,576. Logical Time is the
+ * least significant part of the 64 bit timestamp, so unsigned comparison can be used for LT.
+ */
+
+ private static final int BITS_FOR_LOGICAL_TIME = 20;
+
+ /**
+ * Max value for physical time in the {@link #HYBRID} timestamp representation, inclusive.
+ * This assumes signed comparison.
+ */
+ private static final long PHYSICAL_TIME_MAX_VALUE = 0x7ffffffffffL;
+
+ /**
+ * Max value for logical time in the {@link #HYBRID} timestamp representation
+ */
+ static final long LOGICAL_TIME_MAX_VALUE = 0xfffffL;
+
+ public long toEpochTimeMillisFromTimestamp(long timestamp) {
+ return getPhysicalTime(timestamp);
+ }
+
+ public long fromEpochTimeMillisToTimestamp(long timestamp) {
+ return toTimestamp(TimeUnit.MILLISECONDS, timestamp, 0);
+ }
+
+ public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) {
+ physicalTime = TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit);
+ return (physicalTime << BITS_FOR_LOGICAL_TIME) + logicalTime;
+ }
+
+ public long getPhysicalTime(long timestamp) {
+ return timestamp >>> BITS_FOR_LOGICAL_TIME; // assume unsigned timestamp
+ }
+
+ long getLogicalTime(long timestamp) {
+ return timestamp & LOGICAL_TIME_MAX_VALUE;
+ }
+
+ public long getMaxPhysicalTime() {
+ return PHYSICAL_TIME_MAX_VALUE;
+ }
+
+ public long getMaxLogicalTime() {
+ return LOGICAL_TIME_MAX_VALUE;
+ }
+
+ int getBitsForLogicalTime() {
+ return BITS_FOR_LOGICAL_TIME;
+ }
+
+ /**
+ * Returns whether the given timestamp is "likely" of {@link #HYBRID} {@link TimestampType}.
+ * Timestamp implementations can use the full range of 64bits long to represent physical and
+ * logical components of time. However, this method returns whether the given timestamp is a
+ * likely representation depending on heuristics for the clock implementation.
+ *
+ * Hybrid timestamps are checked whether they belong to Hybrid range assuming
+ * that Hybrid timestamps will only have > 0 logical time component for timestamps
+ * corresponding to years after 2016. This method will return false if lt > 0 and year is
+ * before 2016. Due to left shifting for Hybrid time, all millisecond-since-epoch timestamps
+ * from years 1970-10K fall into
+ * year 1970 when interpreted as Hybrid timestamps. Thus, {@link #isLikelyOfType(long, boolean)} will
+ * return false for timestamps which are in the year 1970 and logical time = 0 when
+ * interpreted as of type Hybrid Time.
+ *
+ * <p>
+ * <b>Note that </b> this method uses heuristics which may not hold
+ * if system timestamps are intermixed from client side and server side or timestamp
+ * sources other than system clock are used.
+ * </p>
+ * @param timestamp {@link #HYBRID} Timestamp
+ * @param isClockMonotonic if the clock that generated this timestamp is monotonic
+ * @return true if the timestamp is likely to be of the corresponding {@link TimestampType}
+ * else false
+ */
+ public boolean isLikelyOfType(long timestamp, boolean isClockMonotonic) {
+ long physicalTime = getPhysicalTime(timestamp);
+ long logicalTime = getLogicalTime(timestamp);
+
+ // heuristic 1: Up until year 2016 (1451635200000), lt component cannot be non-zero.
+ if (physicalTime < 1451635200000L && logicalTime != 0) {
+ return false;
+ } else if (physicalTime < 31536000000L) {
+ // heuristic 2: Even if logical time = 0, physical time after left shifting by 20 bits,
+ // will be before year 1971(31536000000L), as after left shifting by 20, all epoch ms
+ // timestamps from wall time end up in year less than 1971, even for epoch time for the
+ // year 10000. This assumes Hybrid time is not used to represent timestamps for year 1970
+ // UTC.
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns a string representation for Physical Time and Logical Time components. The format is:
+ * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time),Logical Time</code>
+ * Physical Time is converted to UTC time and not to local time for uniformity.
+ * Example: 2015-07-17 16:56:35:891(1437177395891), 0
+ * @param timestamp A {@link #HYBRID} Timestamp
+ * @return A date time string formatted as mentioned in the method description
+ */
+ public String toString(long timestamp) {
+ long physicalTime = getPhysicalTime(timestamp);
+ long logicalTime = getLogicalTime(timestamp);
+ return new StringBuilder().append(dateFormat.format(physicalTime)).append("(")
+ .append(physicalTime).append(")").append(", ").append(logicalTime).toString();
+ }
+ },
+
+ /**
+ * Physical is a Timestamp type used to encode the physical time in 64 bits.
+ * It has helper methods to decipher the 64 bit encoding of physical time.
+ */
+ PHYSICAL {
+ public long toEpochTimeMillisFromTimestamp(long timestamp) {
+ return timestamp;
+ }
+
+ public long fromEpochTimeMillisToTimestamp(long timestamp) {
+ return timestamp;
+ }
+
+ public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime) {
+ return TimeUnit.MILLISECONDS.convert(physicalTime, timeUnit);
+ }
+
+ public long getPhysicalTime(long timestamp) {
+ return timestamp;
+ }
+
+ long getLogicalTime(long timestamp) {
+ return 0;
+ }
+
+ public long getMaxPhysicalTime() {
+ return Long.MAX_VALUE;
+ }
+
+ public long getMaxLogicalTime() {
+ return 0;
+ }
+
+ int getBitsForLogicalTime() {
+ return 0;
+ }
+
+ public boolean isLikelyOfType(long timestamp, boolean isClockMonotonic) {
+ // heuristic: the timestamp should be up to year 3K (32503680000000L).
+ if (!isClockMonotonic) {
+ return true;
+ }
+ return timestamp < 32503680000000L;
+ }
+
+ /**
+ * Returns a string representation for Physical Time and Logical Time components. The format is:
+ * <code>yyyy-MM-dd HH:mm:ss:SSS(Physical Time)</code>
+ * Physical Time is converted to UTC time and not to local time for uniformity.
+ * Example: 2015-07-17 16:56:35:891(1437177395891), 0
+ * @param timestamp epoch time in milliseconds
+ * @return A date time string formatted as mentioned in the method description
+ */
+ public String toString(long timestamp) {
+ long physicalTime = timestamp;
+ return new StringBuilder().append(dateFormat.format(physicalTime)).append("(")
+ .append(physicalTime).append(")").append(", ").append("0").toString();
+ }
+ };
+
+ /**
+ * This is used internally by the enum methods of Hybrid and Physical Timestamp types to
+ * convert the
+ * timestamp to the format set here. UTC timezone instead of local time zone for convenience
+ * and uniformity
+ */
+ private static final FastDateFormat dateFormat =
+ FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss:SSS", TimeZone.getTimeZone("UTC"));
+
+ /**
+ * Converts the given timestamp to the unix epoch timestamp with millisecond resolution.
+ * Returned timestamp is compatible with System.currentTimeMillis().
+ * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+ * @return number of milliseconds from epoch
+ */
+ abstract public long toEpochTimeMillisFromTimestamp(long timestamp);
+
+ /**
+ * Converts the given time in milliseconds to the corresponding {@link TimestampType}
+ * representation.
+ * @param timeInMillis epoch time in {@link TimeUnit#MILLISECONDS}
+ * @return a timestamp representation corresponding to {@link TimestampType}.
+ */
+ abstract public long fromEpochTimeMillisToTimestamp(long timeInMillis);
+
+ /**
+ * Converts the given physical clock in the given {@link TimeUnit} to a 64-bit timestamp
+ * @param timeUnit a time unit as in the enum {@link TimeUnit}
+ * @param physicalTime physical time
+ * @param logicalTime logical time
+ * @return a timestamp in 64 bits
+ */
+ abstract public long toTimestamp(TimeUnit timeUnit, long physicalTime, long logicalTime);
+
+ /**
+ * Extracts and returns the physical time from the timestamp
+ * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+ * @return physical time in {@link TimeUnit#MILLISECONDS}
+ */
+ abstract public long getPhysicalTime(long timestamp);
+
+ /**
+ * Extracts and returns the logical time from the timestamp
+ * @param timestamp {@link #HYBRID} or {@link #PHYSICAL} Timestamp
+ * @return logical time
+ */
+ abstract long getLogicalTime(long timestamp);
+
+ /**
+ * @return the maximum possible physical time in {@link TimeUnit#MILLISECONDS}
+ */
+ abstract public long getMaxPhysicalTime();
+
+ /**
+ * @return the maximum possible logical time
+ */
+ abstract public long getMaxLogicalTime();
+
+ /**
+ * @return number of least significant bits allocated for logical time
+ */
+ abstract int getBitsForLogicalTime();
+
+ /**
+ * @param timestamp epoch time in milliseconds
+ * @param isClockMonotonic if the clock that generated this timestamp is monotonic
+ * @return True if the timestamp generated by the clock is of type {@link #PHYSICAL} else False
+ */
+ abstract public boolean isLikelyOfType(long timestamp, boolean isClockMonotonic);
+
+ public abstract String toString(long timestamp);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
new file mode 100644
index 0000000..295812d
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestClock.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.hbase.TimestampType;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+@Category(SmallTests.class)
+public class TestClock {
+
+ // utils
+ private void assertTimestampsMonotonic(List<Long> timestamps, boolean
+ strictlyIncreasing) {
+ assertTrue(timestamps.size() > 0);
+
+ long prev = 0;
+ for (long timestamp : timestamps) {
+ if (strictlyIncreasing) {
+ assertTrue(timestamps.toString(), timestamp > prev);
+ } else {
+ assertTrue(timestamps.toString(), timestamp >= prev);
+ }
+ prev = timestamp;
+ }
+ }
+
+ // All Clocks Tests
+
+ /**
+ * Remove this test if moving away from millis resolution for physical time. Be sure to change
+ * {@link TimestampType} methods which assume millisecond resolution.
+ */
+ @Test public void TestClocksPhysicalTimeResolution() {
+ Clock.System systemClock = new Clock.System();
+ Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic();
+ Clock.HLC hybridLogicalClock = new Clock.HLC();
+ assertTrue(systemClock.getTimeUnit() == systemMonotonicClock.getTimeUnit()
+ && systemClock.getTimeUnit() == hybridLogicalClock.getTimeUnit()
+ && TimeUnit.MILLISECONDS == systemClock.getTimeUnit());
+ }
+
+ // All System Clock Tests
+ @Test public void TestSystemClockIsMonotonic() {
+ Clock.System systemClock = new Clock.System();
+ assertFalse(systemClock.isMonotonic());
+ }
+
+ @Test public void testSystemClockIsMonotonicallyIncreasing() {
+ Clock.System systemClock = new Clock.System();
+ assertFalse(systemClock.isMonotonicallyIncreasing());
+ }
+
+ // All System Monotonic Clock Tests
+
+ @Test public void testSystemMonotonicClockIsMonotonic() {
+ Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic();
+ assertTrue(systemMonotonicClock.isMonotonic());
+ }
+
+ @Test public void testSystemMonotonicClockIsMonotonicallyIncreasing() {
+ Clock.SystemMonotonic systemMonotonicClock = new Clock.SystemMonotonic();
+ assertFalse(systemMonotonicClock.isMonotonicallyIncreasing());
+ }
+
+ @Test public void testSystemMonotonicNow() {
+ ArrayList<Long> timestamps = new ArrayList<Long>(3);
+ long timestamp;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ Clock.SystemMonotonic systemMonotonic = new Clock.SystemMonotonic(physicalClock, 30000);
+
+ // case 1: Set time and assert
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = systemMonotonic.now();
+ timestamps.add(timestamp);
+
+ assertEquals(100, timestamp);
+
+ // case 2: Go back in time and check monotonic property.
+ when(physicalClock.now()).thenReturn(99L);
+ timestamp = systemMonotonic.now();
+ timestamps.add(timestamp);
+
+ assertEquals(100, timestamp);
+
+ // case 3: system time goes ahead compared to previous timestamp.
+ when(physicalClock.now()).thenReturn(101L);
+ timestamp = systemMonotonic.now();
+ timestamps.add(timestamp);
+
+ assertEquals(101, timestamp);
+
+ assertTimestampsMonotonic(timestamps, false);
+ }
+
+ @Test public void testSystemMonotonicUpdate() {
+ ArrayList<Long> timestamps = new ArrayList<Long>(7);
+ long timestamp;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ Clock.SystemMonotonic systemMonotonic = new Clock.SystemMonotonic(physicalClock, 30000);
+
+ // Set Time
+ when(physicalClock.now()).thenReturn(99L);
+ timestamp = systemMonotonic.now();
+ timestamps.add(timestamp);
+
+ // case 1: Message timestamp is greater than current System Monotonic Time,
+ // physical time at 100 still.
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = systemMonotonic.update(102);
+ timestamps.add(timestamp);
+
+ assertEquals(102, timestamp);
+
+ // case 2: Message timestamp is greater than current System Monotonic Time,
+ // physical time at 100 still.
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = systemMonotonic.update(103);
+ timestamps.add(timestamp);
+
+ assertEquals(103, timestamp);
+
+ // case 3: Message timestamp is less than current System Monotonic Time, greater than current
+ // physical time which is 100.
+ timestamp = systemMonotonic.update(101);
+ timestamps.add(timestamp);
+
+ assertEquals(103, timestamp);
+
+ // case 4: Message timestamp is less than current System Monotonic Time, less than current
+ // physical time which is 100.
+ timestamp = systemMonotonic.update(99);
+ timestamps.add(timestamp);
+
+ assertEquals(103, timestamp);
+
+ // case 5: Message timestamp<System monotonic time and both less than current Physical Time
+ when(physicalClock.now()).thenReturn(106L);
+ timestamp = systemMonotonic.update(102);
+ timestamps.add(timestamp);
+
+ assertEquals(106, timestamp);
+
+ // case 6: Message timestamp>System monotonic time and both less than current Physical Time
+ when(physicalClock.now()).thenReturn(109L);
+ timestamp = systemMonotonic.update(108);
+ timestamps.add(timestamp);
+
+ assertEquals(109, timestamp);
+
+ assertTimestampsMonotonic(timestamps, false);
+ }
+
+ @Test public void testSystemMonotonicUpdateMaxClockSkew() throws Clock.ClockException {
+ long maxClockSkew = 1000;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ Clock.SystemMonotonic systemMonotonic = new Clock.SystemMonotonic(physicalClock, maxClockSkew);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+
+ // Set Current Time.
+ when(physicalClock.now()).thenReturn(100L);
+ systemMonotonic.now();
+
+ systemMonotonic.update(maxClockSkew+100-1);
+
+ try{
+ systemMonotonic.update(maxClockSkew+101);
+ fail("Should have thrown Clock Exception");
+ } catch (Clock.ClockException e){
+ assertTrue(true);
+ }
+ }
+
+
+ // All Hybrid Logical Clock Tests
+ @Test public void testHLCIsMonotonic() {
+ Clock.HLC hybridLogicalClock = new Clock.HLC();
+ assertTrue(hybridLogicalClock.isMonotonic());
+ }
+
+ @Test public void testHLCIsMonotonicallyIncreasing() {
+ Clock.HLC hybridLogicalClock = new Clock.HLC();
+ assertTrue(hybridLogicalClock.isMonotonicallyIncreasing());
+ }
+
+ @Test public void testHLCNow() throws Clock.ClockException {
+ ArrayList<Long> timestamps = new ArrayList<Long>(5);
+ long timestamp;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(physicalClock, 30000);
+
+
+ // case 1: Test if it returns correct time based on current physical time.
+ // Remember, initially logical time = 0
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ assertEquals(100, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(0, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 2: physical time does'nt change, logical time should increment.
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ assertEquals(100, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(1, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 3: physical time does'nt change still, logical time should increment again
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ assertEquals(100, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(2, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 4: physical time moves forward, logical time should reset to 0.
+ when(physicalClock.now()).thenReturn(101L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ assertEquals(101, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(0, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 5: Monotonic increasing check, physical time goes back.
+ when(physicalClock.now()).thenReturn(99L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ assertEquals(101, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(1, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // Check if all timestamps generated in the process are strictly monotonic.
+ assertTimestampsMonotonic(timestamps, true);
+ }
+
+ @Test public void testHLCUNowLogicalTimeOverFlow() throws Clock.ClockException {
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(physicalClock, 100);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+
+ // Set Current Time.
+ when(physicalClock.now()).thenReturn(100L);
+ hybridLogicalClock.setPhysicalTime(100);
+ hybridLogicalClock.setLogicalTime(TimestampType.HYBRID.getMaxLogicalTime());
+
+ try{
+ hybridLogicalClock.now();
+ fail("Should have thrown Clock Exception");
+ } catch (Clock.ClockException e){
+ assertTrue(true);
+ }
+ }
+
+ @Test public void testHLCUpdate() throws Clock.ClockException {
+ ArrayList<Long> timestamps = new ArrayList<Long>(5);
+ long timestamp, messageTimestamp;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(physicalClock, 100);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+
+ // Set Current Time.
+ when(physicalClock.now()).thenReturn(100L);
+ timestamp = hybridLogicalClock.now();
+ timestamps.add(timestamp);
+
+ // case 1: Message physical timestamp is lower than current physical time.
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 99, 1);
+ when(physicalClock.now()).thenReturn(101L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(101, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(0, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 2: Message physical timestamp is greater than HLC physical time.
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 3);
+ when(physicalClock.now()).thenReturn(102L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(105, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(4, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // case 3: Message timestamp is less than HLC timestamp
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 104 , 4);
+ when(physicalClock.now()).thenReturn(103L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(105, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(5, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ //case 4: Message timestamp with same physical time as HLC, but lower logical time
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 2);
+ when(physicalClock.now()).thenReturn(101L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(105, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(6, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ //case 5: Message timestamp with same physical time as HLC, but higher logical time
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 8);
+ when(physicalClock.now()).thenReturn(102L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(105, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(9, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ //case 6: Actual Physical Time greater than message physical timestamp and HLC physical time.
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 105 , 10);
+ when(physicalClock.now()).thenReturn(110L);
+ timestamp = hybridLogicalClock.update(messageTimestamp);
+ timestamps.add(timestamp);
+
+ assertEquals(110, hybridLogicalClock.getTimestampType().getPhysicalTime(timestamp));
+ assertEquals(0, hybridLogicalClock.getTimestampType().getLogicalTime(timestamp));
+
+ // Check if all timestamps generated in the process are strictly monotonic.
+ assertTimestampsMonotonic(timestamps, true);
+ }
+
+ @Test public void testHLCUpdateLogicalTimeOverFlow() throws Clock.ClockException {
+ long messageTimestamp;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(physicalClock, 100);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+
+ // Set Current Time.
+ when(physicalClock.now()).thenReturn(100L);
+ hybridLogicalClock.now();
+
+ try{
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, 100,
+ TimestampType.HYBRID.getMaxLogicalTime());
+ hybridLogicalClock.update(messageTimestamp);
+ fail("Should have thrown Clock Exception");
+ } catch (Clock.ClockException e){
+ assertTrue(true);
+ }
+ }
+
+ @Test public void testHLCUpdateMaxClockSkew() throws Clock.ClockException {
+ long messageTimestamp, maxClockSkew = 1000;
+ Clock.PhysicalClock physicalClock = mock(Clock.PhysicalClock.class);
+ Clock.HLC hybridLogicalClock = new Clock.HLC(physicalClock, maxClockSkew);
+ when(physicalClock.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
+
+ // Set Current Time.
+ when(physicalClock.now()).thenReturn(100L);
+ hybridLogicalClock.now();
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+ maxClockSkew-100, 0);
+ hybridLogicalClock.update(messageTimestamp);
+
+ try{
+ messageTimestamp = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+ maxClockSkew+101, 0);
+ hybridLogicalClock.update(messageTimestamp);
+ fail("Should have thrown Clock Exception");
+ } catch (Clock.ClockException e){
+ assertTrue(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
new file mode 100644
index 0000000..01c8314
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTimestampType.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+@Category(SmallTests.class)
+public class TestTimestampType {
+
+ private static long testPhysicalTime = 1234567890123L;
+ private static long testLogicalTime = 12;
+
+ /*
+ * Tests for TimestampType enum
+ */
+
+ @Test
+ public void testFromToEpoch() {
+ for (TimestampType timestamp : TimestampType.values()) {
+ long wallTime = System.currentTimeMillis();
+ long converted = timestamp.toEpochTimeMillisFromTimestamp(
+ timestamp.fromEpochTimeMillisToTimestamp(wallTime));
+
+ assertEquals(wallTime, converted);
+ }
+ }
+
+ /* Tests for HL Clock */
+ @Test
+ public void testHybridMaxValues() {
+ // assert 44-bit Physical Time with signed comparison (actual 43 bits)
+ assertEquals(
+ (1L << (63-TimestampType.HYBRID.getBitsForLogicalTime())) - 1,
+ TimestampType.HYBRID.getMaxPhysicalTime());
+
+ // assert 20-bit Logical Time
+ assertEquals(
+ (1L << TimestampType.HYBRID.getBitsForLogicalTime()) - 1,
+ TimestampType.HYBRID.getMaxLogicalTime());
+
+ // assert that maximum representable timestamp is Long.MAX_VALUE (assuming signed comparison).
+ assertEquals(
+ Long.MAX_VALUE,
+ TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS,
+ TimestampType.HYBRID.getMaxPhysicalTime(),
+ TimestampType.HYBRID.getMaxLogicalTime())
+ );
+ }
+
+ @Test
+ public void testHybridGetPhysicalTime() {
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(testPhysicalTime, TimestampType.HYBRID.getPhysicalTime(ts));
+ }
+
+ @Test
+ public void testHybridGetLogicalTime() {
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(testLogicalTime, TimestampType.HYBRID.getLogicalTime(ts));
+ }
+
+ @Test
+ public void testHybridToString() {
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+
+ assertEquals("2009-02-13T23:31:30:123(1234567890123), 12", TimestampType.HYBRID.toString(ts));
+ }
+
+ @Test
+ public void testHybridToTimestamp() {
+ long expected = (testPhysicalTime << TimestampType.HYBRID.getBitsForLogicalTime()) + testLogicalTime;
+ // test millisecond
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(ts, expected);
+
+ // test nanosecond
+ ts = TimestampType.HYBRID.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime);
+ assertEquals(ts, expected);
+ }
+
+ @Test
+ public void testHybridIsLikelyOfType() throws ParseException {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS Z");
+
+ // test timestamps of Hybrid type from year 1971 to 2248 where lt = 0
+ for (int year = 1971; year <= 2248; year += 1) {
+ Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC");
+
+ // Hybrid type ts with pt = date and lt = 0
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0);
+ System.out.println(TimestampType.HYBRID.toString(ts));
+
+ assertTrue(TimestampType.HYBRID.isLikelyOfType(ts, true));
+ }
+
+ // test timestamps of Hybrid type from year 2016 to 2348 where lt > 0
+ for (int year = 2016; year <= 2248; year += 1) {
+ Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC");
+
+ // Hybrid type ts with pt = date and lt = 123
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 123);
+ System.out.println(TimestampType.HYBRID.toString(ts));
+
+ assertTrue(TimestampType.HYBRID.isLikelyOfType(ts, true));
+ }
+
+ // test that timestamps from different years are not Hybrid type
+ for (int year = 1970; year <= 10000 ;year += 10) {
+ // Stardate 1970 to 10000
+ Date date = dateFormat.parse(year + "-01-01T00:00:00:000 UTC");
+ long ts = date.getTime();
+ System.out.println(TimestampType.PHYSICAL.toString(ts));
+ System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts)));
+
+ assertFalse(TimestampType.HYBRID.isLikelyOfType(ts, true));
+ }
+
+ // test that timestamps up to 2016 are not Hybrid even if lt = 0
+ for (int year = 1970; year <= 2016; year += 1) {
+ Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC");
+
+ // reset lt = 0
+ long ts = ((date.getTime()
+ >> TimestampType.HYBRID.getBitsForLogicalTime()) << TimestampType.HYBRID.getBitsForLogicalTime());
+ System.out.println(Long.toHexString(ts));
+
+ System.out.println(TimestampType.PHYSICAL.toString(ts));
+ System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts)));
+
+ assertFalse(TimestampType.HYBRID.isLikelyOfType(ts, true));
+ }
+
+ // test that timestamps from currentTime epoch are not Hybrid type
+ long systemTimeNow = System.currentTimeMillis();
+ System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow));
+ System.out.println(TimestampType.PHYSICAL.toString((TimestampType.HYBRID.getPhysicalTime(systemTimeNow))));
+ assertFalse(TimestampType.HYBRID.isLikelyOfType(systemTimeNow, true));
+ }
+
+
+ @Test
+ public void testPhysicalMaxValues() {
+ assertEquals(
+ (1L << 63) - 1,
+ TimestampType.PHYSICAL.getMaxPhysicalTime());
+
+ assertEquals(0, TimestampType.PHYSICAL.getMaxLogicalTime());
+ }
+
+ @Test
+ public void testPhysicalGetPhysicalTime() {
+ long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(testPhysicalTime, TimestampType.PHYSICAL.getPhysicalTime(ts));
+ }
+
+ @Test
+ public void testPhysicalGetLogicalTime() {
+ long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(0, TimestampType.PHYSICAL.getLogicalTime(ts));
+ }
+
+ @Test
+ public void testPhysicalToString() {
+ long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+
+ assertEquals("2009-02-13T23:31:30:123(1234567890123), 0", TimestampType.PHYSICAL.toString(ts));
+ }
+
+ @Test
+ public void testPhysicalToTimestamp() {
+ // test millisecond
+ long ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.MILLISECONDS, testPhysicalTime, testLogicalTime);
+ assertEquals(ts, testPhysicalTime);
+
+ // test nanosecond
+ ts = TimestampType.PHYSICAL.toTimestamp(TimeUnit.NANOSECONDS, TimeUnit.MILLISECONDS.toNanos(testPhysicalTime), testLogicalTime);
+ assertEquals(ts, testPhysicalTime);
+ }
+
+ @Test
+ public void testPhysicalIsLikelyOfType() throws ParseException {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss:SSS Z");
+
+ // test that timestamps from 1970 to 3K epoch are of Physical type
+ for (int year = 1970; year < 3000 ;year += 10) {
+ // Start date 1970 to 10000
+ Date date = dateFormat.parse(year + "-01-01T00:00:00:000 UTC");
+ long ts = date.getTime();
+ System.out.println(TimestampType.PHYSICAL.toString(ts));
+ System.out.println(TimestampType.PHYSICAL.toString(TimestampType.HYBRID.getPhysicalTime(ts)));
+
+ assertTrue(TimestampType.PHYSICAL.isLikelyOfType(ts, true));
+ }
+
+ // test that timestamps from currentTime epoch are of Physical type
+ long systemTimeNow = System.currentTimeMillis();
+ System.out.println(TimestampType.PHYSICAL.toString(systemTimeNow));
+ assertTrue(TimestampType.PHYSICAL.isLikelyOfType(systemTimeNow, true));
+
+ // test timestamps of Hybrid type from year 1970 to 2248 are not of Physical type
+ for (int year = 1970; year <= 2248; year += 1) {
+ Date date = dateFormat.parse(year + "-01-01T11:22:33:444 UTC");
+
+ // Hybrid type ts with pt = date and lt = 0
+ long ts = TimestampType.HYBRID.toTimestamp(TimeUnit.MILLISECONDS, date.getTime(), 0);
+ System.out.println(TimestampType.HYBRID.toString(ts));
+ System.out.println(TimestampType.PHYSICAL.toString(ts));
+
+ assertFalse(TimestampType.PHYSICAL.isLikelyOfType(ts, true));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 20a6a03..6d42c06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -256,6 +256,10 @@ public class ModifyTableProcedure
throw new IOException("REGION_REPLICATION change is not supported for enabled tables");
}
}
+ // do not allow changing of clock type.
+ if (modifiedHTableDescriptor.getClockType() != unmodifiedHTableDescriptor.getClockType()) {
+ throw new IOException("Clock Type change is not supported for tables");
+ }
// Find out whether all column families in unmodifiedHTableDescriptor also exists in
// the modifiedHTableDescriptor. This is to determine whether we are safe to rollback.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b460d1a..b45a150 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ClockType;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -102,6 +103,8 @@ import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.TimestampType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
@@ -379,6 +382,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return minimumReadPoint;
}
+ @Override
+ public Clock getClock() {
+ if (this.clock == null) {
+ return this.getRegionServerServices().getRegionServerClock(this.getTableDesc().getClockType());
+ }
+ return this.clock;
+ }
+
+ /**
+ * Only for the purpose of testing
+ * @param clock
+ */
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
/*
* Data structure of write state flags used coordinating flushes,
* compactions and closes.
@@ -616,6 +636,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
final RegionServerServices rsServices;
+ private Clock clock;
private RegionServerAccounting rsAccounting;
private long flushCheckInterval;
// flushPerChanges is to prevent too many changes in memstore
@@ -774,6 +795,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
? DEFAULT_DURABILITY
: htd.getDurability();
if (rsServices != null) {
+ this.clock = rsServices.getRegionServerClock(htd.getClockType());
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
@@ -788,6 +810,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
recoveringRegions.put(encodedName, this);
}
} else {
+ Clock systemClock = new Clock.System();
+ this.clock = systemClock;
this.metricsRegionWrapper = null;
this.metricsRegion = null;
}
@@ -2789,8 +2813,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+ /*
+ * Clients use physical timestamps when setting time ranges. Tables that use HLCs must map the
+ * physical timestamp to HLC time
+ */
+ private void mapTimeRangesWithRespectToClock(Scan scan) {
+ TimeRange tr = scan.getTimeRange();
+ if (tr.isAllTime()) {
+ return;
+ }
+ TimestampType timestampType = getClock().getTimestampType();
+ // Clip time range max to prevent overflow when converting from epoch time to timestamp time
+ long trMaxClipped = Math.min(tr.getMax(), timestampType.getMaxPhysicalTime());
+ try {
+ scan.setTimeRange(timestampType.fromEpochTimeMillisToTimestamp(tr.getMin()),
+ timestampType.fromEpochTimeMillisToTimestamp(trMaxClipped));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
private RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
long nonceGroup, long nonce) throws IOException {
+ if (getClock().clockType == ClockType.HLC) {
+ mapTimeRangesWithRespectToClock(scan);
+ }
startRegionOperation(Operation.SCAN);
try {
// Verify families are all valid
@@ -3212,7 +3259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0;
- long now = EnvironmentEdgeManager.currentTime();
+ long now = clock.now();
while (lastIndexExclusive < batchOp.operations.length) {
if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) {
lastIndexExclusive++;
@@ -3250,7 +3297,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
- now = EnvironmentEdgeManager.currentTime();
+ now = clock.now();
byte[] byteNow = Bytes.toBytes(now);
// Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
@@ -3749,8 +3796,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
// larger timestamp than what was observed via Get. doBatchMutate already does this, but
// there is no way to pass the cellTs. See HBASE-14054.
- long now = EnvironmentEdgeManager.currentTime();
- long ts = Math.max(now, cellTs); // ensure write is not eclipsed
+ long now = clock.now();
+ long ts = clock.isMonotonic() ? now : Math.max(now, cellTs); // ensure write is not eclipsed
byte[] byteTs = Bytes.toBytes(ts);
if (mutation != null) {
if (mutation instanceof Put) {
@@ -4031,7 +4078,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
return;
}
- long maxTs = now + timestampSlop;
+ long maxTs = clock.getTimestampType().getPhysicalTime(now) + timestampSlop;
for (List<Cell> kvs : familyMap.values()) {
assert kvs instanceof RandomAccess;
int listSize = kvs.size();
@@ -7091,7 +7138,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Short circuit the read only case
if (processor.readOnly()) {
try {
- long now = EnvironmentEdgeManager.currentTime();
+ long now = clock.now();
doProcessRowWithTimeout(processor, now, this, null, null, timeout);
processor.postProcess(this, walEdit, true);
} finally {
@@ -7121,7 +7168,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
locked = true;
- long now = EnvironmentEdgeManager.currentTime();
+ long now = clock.now();
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
@@ -7430,7 +7477,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final List<Cell> results)
throws IOException {
WALEdit walEdit = null;
- long now = EnvironmentEdgeManager.currentTime();
+ long now = clock.now();
final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
// Process a Store/family at a time.
for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
@@ -7546,7 +7593,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long ts = now;
if (currentValue != null) {
tags = TagUtil.carryForwardTags(tags, currentValue);
- ts = Math.max(now, currentValue.getTimestamp() + 1);
+ if (this.getClock().clockType == ClockType.SYSTEM) {
+ ts = Math.max(now, currentValue.getTimestamp() + 1);
+ }
newValue += getLongValue(currentValue);
}
// Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made...
@@ -7572,7 +7621,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte [] row = mutation.getRow();
if (currentValue != null) {
tags = TagUtil.carryForwardTags(tags, currentValue);
- ts = Math.max(now, currentValue.getTimestamp() + 1);
+ if (this.getClock().clockType == ClockType.SYSTEM) {
+ ts = Math.max(now, currentValue.getTimestamp() + 1);
+ }
tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
byte[] tagBytes = TagUtil.fromList(tags);
// Allocate an empty cell and copy in all parts.
@@ -7675,7 +7726,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ 50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(15 * Bytes.SIZEOF_LONG) +
6 * Bytes.SIZEOF_BOOLEAN);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f0537e0..59a7498 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.ClockType;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -328,6 +330,10 @@ public class HRegionServer extends HasThread implements
// debugging and unit tests.
private volatile boolean abortRequested;
+ final protected Clock hybridLogicalClock;
+ final protected Clock systemMonotonicClock;
+ final protected Clock systemClock;
+
ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>();
// A state before we go into stopped state. At this stage we're closing user
@@ -576,6 +582,10 @@ public class HRegionServer extends HasThread implements
this.abortRequested = false;
this.stopped = false;
+ this.hybridLogicalClock = new Clock.HLC();
+ this.systemMonotonicClock = new Clock.SystemMonotonic();
+ this.systemClock = new Clock.System();
+
rpcServices = createRpcServices();
this.startcode = System.currentTimeMillis();
if (this instanceof HMaster) {
@@ -2062,6 +2072,17 @@ public class HRegionServer extends HasThread implements
}
@Override
+ public Clock getRegionServerClock(ClockType clockType) {
+ if (clockType.equals(ClockType.HLC)){
+ return this.hybridLogicalClock;
+ } else if (clockType.equals(ClockType.SYSTEM_MONOTONIC)) {
+ return this.systemMonotonicClock;
+ } else {
+ return this.systemClock;
+ }
+ }
+
+ @Override
public Connection getConnection() {
return getClusterConnection();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9ab52c3..d7c74d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.hadoop.hbase.TimestampType;
+import org.apache.hadoop.hbase.Clock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -340,10 +342,10 @@ public class HStore implements Store {
/**
* @param family
- * @return TTL in seconds of the specified family
+ * @return TTL in milli seconds of the specified family
*/
public static long determineTTLFromFamily(final HColumnDescriptor family) {
- // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
+ // HColumnDescriptor.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
// Default is unlimited ttl.
@@ -401,6 +403,10 @@ public class HStore implements Store {
return this.memstore.getFlushableSize();
}
+ public Clock getClock() {
+ return region.getClock();
+ }
+
@Override
@Deprecated
public long getSnapshotSize() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 63e18c3..7bcacf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Clock;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
@@ -81,6 +82,11 @@ public interface Region extends ConfigurationObserver {
/** @return table descriptor for this region */
HTableDescriptor getTableDesc();
+ /** @return clock of the Region Server corresponding the clock type used by the
+ * table contained in this region.
+ */
+ Clock getClock();
+
/** @return true if region is available (not closed and not closing) */
boolean isAvailable();
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 5afa652..5c37136 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.Clock;
+import org.apache.hadoop.hbase.ClockType;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.Service;
@@ -58,6 +60,8 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* default (common) WAL */
WAL getWAL(HRegionInfo regionInfo) throws IOException;
+ Clock getRegionServerClock(ClockType clockType);
+
/** @return the List of WALs that are used by this server
* Doesn't include the meta WAL
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/9fe94c11/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index c0df66a..aa4c257 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Clock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan;
@@ -346,6 +347,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
MemstoreSize getSizeToFlush();
/**
+ * @return clock of the Region Server corresponding the clock type used by the
+ * table referred to by this store.
+ */
+ Clock getClock();
+
+ /**
* Returns the memstore snapshot size
* @return size of the memstore snapshot
* @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfSnapshot()} instead.