You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/09 21:25:06 UTC

[1/2] hbase git commit: HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)

Repository: hbase
Updated Branches:
  refs/heads/master 9cbeba6c3 -> dd1ae3714


HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15762691
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15762691
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15762691

Branch: refs/heads/master
Commit: 1576269123f18c9eb21b04a800e81952ec52c04d
Parents: 9cbeba6
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 10:46:34 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 10:46:34 2017 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  |  6 +-
 .../hbase/regionserver/CompactionPipeline.java  | 76 ++++++++++++--------
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  2 +
 3 files changed, 51 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
   @VisibleForTesting
   @Override
   protected List<Segment> getSegments() {
-    List<Segment> pipelineList = pipeline.getSegments();
-    List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+    List<? extends Segment> pipelineList = pipeline.getSegments();
+    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
     list.add(this.active);
     list.addAll(pipelineList);
     list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
-    List<Segment> pipelineList = pipeline.getSegments();
+    List<? extends Segment> pipelineList = pipeline.getSegments();
     long order = pipelineList.size();
     // The list of elements in pipeline + the active element + the snapshot segment
     // TODO : This will change when the snapshot is made of more than one element

http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..ebc8c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -25,50 +25,63 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
- * It supports pushing a segment at the head of the pipeline and pulling a segment from the
- * tail to flush to disk.
- * It also supports swap operation to allow the compactor swap a subset of the segments with a new
- * (compacted) one. This swap succeeds only if the version number passed with the list of segments
- * to swap is the same as the current version of the pipeline.
- * The pipeline version is updated whenever swapping segments or pulling the segment at the tail.
+ * It supports pushing a segment at the head of the pipeline and removing a segment from the
+ * tail when it is flushed to disk.
+ * It also supports swap method to allow the in-memory compaction swap a subset of the segments
+ * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
+ * number passed with the list of segments to swap is the same as the current version of the
+ * pipeline.
+ * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
+ * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
+ * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
+ * suffix of the pipeline.
+ *
+ * The synchronization model is copy-on-write. Methods which change the structure of the
+ * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
+ * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
+ * method accesses the read-only copy more than once it makes a local copy of it
+ * to ensure it accesses the same copy.
+ *
+ * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * protected by a lock since they need to have a consistent (atomic) view of the pipeline lsit
+ * and version number.
  */
 @InterfaceAudience.Private
 public class CompactionPipeline {
   private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
 
   public final static long FIXED_OVERHEAD = ClassSize
-      .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
-  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
+      .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+  public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
 
   private final RegionServicesForStores region;
-  private LinkedList<ImmutableSegment> pipeline;
-  private long version;
+  private LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
+  private LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
+  private volatile long version = 0;
 
   public CompactionPipeline(RegionServicesForStores region) {
     this.region = region;
-    this.pipeline = new LinkedList<>();
-    this.version = 0;
   }
 
   public boolean pushHead(MutableSegment segment) {
     ImmutableSegment immutableSegment = SegmentFactory.instance().
         createImmutableSegment(segment);
     synchronized (pipeline){
-      return addFirst(immutableSegment);
+      boolean res = addFirst(immutableSegment);
+      readOnlyCopy = new LinkedList<>(pipeline);
+      return res;
     }
   }
 
   public VersionedSegmentsList getVersionedList() {
     synchronized (pipeline){
-      List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
-      return new VersionedSegmentsList(segmentList, version);
+      return new VersionedSegmentsList(readOnlyCopy, version);
     }
   }
 
@@ -93,8 +106,10 @@ public class CompactionPipeline {
    *        During index merge op this will be false and for compaction it will be true.
    * @return true iff swapped tail with new segment
    */
-  public boolean swap(
-      VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
+        justification="Increment is done under a synchronize block so safe")
+  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
+      boolean closeSuffix) {
     if (versionedList.getVersion() != version) {
       return false;
     }
@@ -115,6 +130,8 @@ public class CompactionPipeline {
             + ", and the number of cells in new segment is:" + count);
       }
       swapSuffix(suffix, segment, closeSuffix);
+      readOnlyCopy = new LinkedList<>(pipeline);
+      version++;
     }
     if (closeSuffix && region != null) {
       // update the global memstore size counter
@@ -193,35 +210,34 @@ public class CompactionPipeline {
   }
 
   public boolean isEmpty() {
-    return pipeline.isEmpty();
+    return readOnlyCopy.isEmpty();
   }
 
-  public List<Segment> getSegments() {
-    synchronized (pipeline){
-      return new LinkedList<>(pipeline);
-    }
+  public List<? extends Segment> getSegments() {
+    return readOnlyCopy;
   }
 
   public long size() {
-    return pipeline.size();
+    return readOnlyCopy.size();
   }
 
   public long getMinSequenceId() {
     long minSequenceId = Long.MAX_VALUE;
-    if (!isEmpty()) {
-      minSequenceId = pipeline.getLast().getMinSequenceId();
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
+    if (!localCopy.isEmpty()) {
+      minSequenceId = localCopy.getLast().getMinSequenceId();
     }
     return minSequenceId;
   }
 
   public MemstoreSize getTailSize() {
-    if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
-    return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
+    LinkedList<? extends Segment> localCopy = readOnlyCopy;
+    if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+    return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
   }
 
-  private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
+  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
-    version++;
     // During index merge we won't be closing the segments undergoing the merge. Segment#close()
     // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
     // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data

http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 6e8f831..ceaadbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -325,6 +325,7 @@ public class TestHeapSize  {
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     expected += ClassSize.estimateBase(CompactionPipeline.class, false);
     expected += ClassSize.estimateBase(LinkedList.class, false);
+    expected += ClassSize.estimateBase(LinkedList.class, false);
     expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
     expected += ClassSize.estimateBase(AtomicBoolean.class, false);
     if (expected != actual) {
@@ -333,6 +334,7 @@ public class TestHeapSize  {
       ClassSize.estimateBase(AtomicBoolean.class, true);
       ClassSize.estimateBase(CompactionPipeline.class, true);
       ClassSize.estimateBase(LinkedList.class, true);
+      ClassSize.estimateBase(LinkedList.class, true);
       ClassSize.estimateBase(MemStoreCompactor.class, true);
       ClassSize.estimateBase(AtomicBoolean.class, true);
       assertEquals(expected, actual);


[2/2] hbase git commit: HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)

Posted by st...@apache.org.
HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd1ae371
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd1ae371
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd1ae371

Branch: refs/heads/master
Commit: dd1ae37148c13e79ec37817f3953a79dd40e8e87
Parents: 1576269
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 13:24:53 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 13:24:53 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/ClassSize.java |   3 +-
 .../hbase/regionserver/TimeRangeTracker.java    | 113 ++++++++++---------
 .../regionserver/TestTimeRangeTracker.java      |  92 ++++++++++++++-
 3 files changed, 154 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index 85a6483..465bd9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -313,8 +313,7 @@ public class ClassSize {
 
     TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
 
-    TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
-
+    TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
     CELL_SET = align(OBJECT + REFERENCE);
 
     STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
index 1ea3c70..95c7c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
-
 /**
- * Stores minimum and maximum timestamp values.
+ * Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
+ * interval notation.
  * Use this class at write-time ONLY. Too much synchronization to use at read time
  * (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
  * updates but then later we can make one as part of a compaction when there is only one thread
@@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
 public class TimeRangeTracker implements Writable {
   static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
   static final long INITIAL_MAX_TIMESTAMP = -1L;
-  long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
-  long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
+
+  AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
+  AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
 
   /**
    * Default constructor.
@@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
    * @param trt source TimeRangeTracker
    */
   public TimeRangeTracker(final TimeRangeTracker trt) {
-    set(trt.getMin(), trt.getMax());
+    minimumTimestamp.set(trt.getMin());
+    maximumTimestamp.set(trt.getMax());
   }
 
   public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
-    set(minimumTimestamp, maximumTimestamp);
-  }
-
-  private void set(final long min, final long max) {
-    this.minimumTimestamp = min;
-    this.maximumTimestamp = max;
-  }
-
-  /**
-   * @param l
-   * @return True if we initialized values
-   */
-  private boolean init(final long l) {
-    if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
-    set(l, l);
-    return true;
+    this.minimumTimestamp.set(minimumTimestamp);
+    this.maximumTimestamp.set(maximumTimestamp);
   }
 
   /**
@@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
       justification="Intentional")
   void includeTimestamp(final long timestamp) {
-    // Do test outside of synchronization block.  Synchronization in here can be problematic
-    // when many threads writing one Store -- they can all pile up trying to add in here.
-    // Happens when doing big write upload where we are hammering on one region.
-    if (timestamp < this.minimumTimestamp) {
-      synchronized (this) {
-        if (!init(timestamp)) {
-          if (timestamp < this.minimumTimestamp) {
-            this.minimumTimestamp = timestamp;
-          }
+    long initialMinTimestamp = this.minimumTimestamp.get();
+    if (timestamp < initialMinTimestamp) {
+      long curMinTimestamp = initialMinTimestamp;
+      while (timestamp < curMinTimestamp) {
+        if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
+          curMinTimestamp = this.minimumTimestamp.get();
+        } else {
+          // successfully set minimumTimestamp, break.
+          break;
         }
       }
-    } else if (timestamp > this.maximumTimestamp) {
-      synchronized (this) {
-        if (!init(timestamp)) {
-          if (this.maximumTimestamp < timestamp) {
-            this.maximumTimestamp =  timestamp;
-          }
+
+      // When it reaches here, there are two possibilities:
+      //  1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
+      //      it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
+      //      if it needs to update minimumTimestamp. Someone may already set both
+      //      minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
+      //      need to check if maximumTimestamp needs to be updated.
+      //  2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
+      //      In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
+      //      to see if it needs to set maximumTimestamp.
+      if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
+        // Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
+        // In this case, no need to set maximumTimestamp as it will be set to at least
+        // initialMinTimestamp.
+        return;
+      }
+    }
+
+    long curMaxTimestamp = this.maximumTimestamp.get();
+
+    if (timestamp > curMaxTimestamp) {
+      while (timestamp > curMaxTimestamp) {
+        if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
+          curMaxTimestamp = this.maximumTimestamp.get();
+        } else {
+          // successfully set maximumTimestamp, break
+          break;
         }
       }
     }
@@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
 
   /**
    * Check if the range has ANY overlap with TimeRange
-   * @param tr TimeRange
+   * @param tr TimeRange, it expects [minStamp, maxStamp)
    * @return True if there is overlap, false otherwise
    */
-  public synchronized boolean includesTimeRange(final TimeRange tr) {
-    return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
+  public boolean includesTimeRange(final TimeRange tr) {
+    return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
   }
 
   /**
    * @return the minimumTimestamp
    */
-  public synchronized long getMin() {
-    return minimumTimestamp;
+  public long getMin() {
+    return minimumTimestamp.get();
   }
 
   /**
    * @return the maximumTimestamp
    */
-  public synchronized long getMax() {
-    return maximumTimestamp;
+  public long getMax() {
+    return maximumTimestamp.get();
   }
 
-  public synchronized void write(final DataOutput out) throws IOException {
-    out.writeLong(minimumTimestamp);
-    out.writeLong(maximumTimestamp);
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(minimumTimestamp.get());
+    out.writeLong(maximumTimestamp.get());
   }
 
-  public synchronized void readFields(final DataInput in) throws IOException {
-    this.minimumTimestamp = in.readLong();
-    this.maximumTimestamp = in.readLong();
+  public void readFields(final DataInput in) throws IOException {
+
+    this.minimumTimestamp.set(in.readLong());
+    this.maximumTimestamp.set(in.readLong());
   }
 
   @Override
-  public synchronized String toString() {
-    return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
+  public String toString() {
+    return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
index 2803baf..3ee6114 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
 
 @Category({RegionServerTests.class, SmallTests.class})
 public class TestTimeRangeTracker {
+  private static final int NUM_KEYS = 10000000;
+
   @Test
   public void testExtreme() {
     TimeRange tr = new TimeRange();
@@ -119,6 +121,7 @@ public class TestTimeRangeTracker {
     for (int i = 0; i < threads.length; i++) {
       threads[i].join();
     }
+
     assertTrue(trr.getMax() == calls * threadCount);
     assertTrue(trr.getMin() == 0);
   }
@@ -156,6 +159,93 @@ public class TestTimeRangeTracker {
     assertFalse(twoArgRange3.isAllTime());
   }
 
+  final static int NUM_OF_THREADS = 20;
+
+  class RandomTestData {
+    private long[] keys = new long[NUM_KEYS];
+    private long min = Long.MAX_VALUE;
+    private long max = 0;
+
+    public RandomTestData() {
+      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
+        for (int i = 0; i < NUM_KEYS; i++) {
+          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+          if (keys[i] < min) min = keys[i];
+          if (keys[i] > max) max = keys[i];
+        }
+      } else {
+        for (int i = NUM_KEYS - 1; i >= 0; i--) {
+          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+          if (keys[i] < min) min = keys[i];
+          if (keys[i] > max) max = keys[i];
+        }
+      }
+    }
+
+    public long getMax() {
+      return this.max;
+    }
+
+    public long getMin() {
+      return this.min;
+    }
+  }
+
+  class TrtUpdateRunnable implements Runnable {
+
+    private TimeRangeTracker trt;
+    private RandomTestData data;
+    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
+      this.trt = trt;
+      this.data = data;
+    }
+
+    public void run() {
+      for (long key : data.keys) {
+        trt.includeTimestamp(key);
+      }
+    }
+  }
+
+  /**
+   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
+   * at right range.  The data chosen is going to ensure that there are lots collisions, i.e,
+   * some other threads may already update the value while one tries to update min/max value.
+   */
+  @Test
+  public void testConcurrentIncludeTimestampCorrectness() {
+    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
+    long min = Long.MAX_VALUE, max = 0;
+    for (int i = 0; i < NUM_OF_THREADS; i ++) {
+      testData[i] = new RandomTestData();
+      if (testData[i].getMin() < min) {
+        min = testData[i].getMin();
+      }
+      if (testData[i].getMax() > max) {
+        max = testData[i].getMax();
+      }
+    }
+
+    TimeRangeTracker trt = new TimeRangeTracker();
+
+    Thread[] t = new Thread[NUM_OF_THREADS];
+    for (int i = 0; i < NUM_OF_THREADS; i++) {
+      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
+      t[i].start();
+    }
+
+    for (Thread thread : t) {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    assertTrue(min == trt.getMin());
+    assertTrue(max == trt.getMax());
+  }
+
   /**
    * Bit of code to test concurrent access on this class.
    * @param args