You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/09 21:25:06 UTC
[1/2] hbase git commit: HBASE-17434 New Synchronization Scheme for
Compaction Pipeline (Eshcar Hillel)
Repository: hbase
Updated Branches:
refs/heads/master 9cbeba6c3 -> dd1ae3714
HBASE-17434 New Synchronization Scheme for Compaction Pipeline (Eshcar Hillel)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15762691
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15762691
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15762691
Branch: refs/heads/master
Commit: 1576269123f18c9eb21b04a800e81952ec52c04d
Parents: 9cbeba6
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 10:46:34 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 10:46:34 2017 -0800
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 6 +-
.../hbase/regionserver/CompactionPipeline.java | 76 ++++++++++++--------
.../apache/hadoop/hbase/io/TestHeapSize.java | 2 +
3 files changed, 51 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e1289f8..99c1685 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting
@Override
protected List<Segment> getSegments() {
- List<Segment> pipelineList = pipeline.getSegments();
- List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
+ List<? extends Segment> pipelineList = pipeline.getSegments();
+ List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(this.active);
list.addAll(pipelineList);
list.add(this.snapshot);
@@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
- List<Segment> pipelineList = pipeline.getSegments();
+ List<? extends Segment> pipelineList = pipeline.getSegments();
long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 9d5df77..ebc8c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -25,50 +25,63 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
- * It supports pushing a segment at the head of the pipeline and pulling a segment from the
- * tail to flush to disk.
- * It also supports swap operation to allow the compactor swap a subset of the segments with a new
- * (compacted) one. This swap succeeds only if the version number passed with the list of segments
- * to swap is the same as the current version of the pipeline.
- * The pipeline version is updated whenever swapping segments or pulling the segment at the tail.
+ * It supports pushing a segment at the head of the pipeline and removing a segment from the
+ * tail when it is flushed to disk.
+ * It also supports swap method to allow the in-memory compaction swap a subset of the segments
+ * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
+ * number passed with the list of segments to swap is the same as the current version of the
+ * pipeline.
+ * Essentially, there are two methods which can change the structure of the pipeline: pushHead()
+ * and swap(), the later is used both by a flush to disk and by an in-memory compaction.
+ * The pipeline version is updated by swap(); it allows to identify conflicting operations at the
+ * suffix of the pipeline.
+ *
+ * The synchronization model is copy-on-write. Methods which change the structure of the
+ * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make
+ * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read
+ * method accesses the read-only copy more than once it makes a local copy of it
+ * to ensure it accesses the same copy.
+ *
+ * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * protected by a lock since they need to have a consistent (atomic) view of the pipeline lsit
+ * and version number.
*/
@InterfaceAudience.Private
public class CompactionPipeline {
private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
public final static long FIXED_OVERHEAD = ClassSize
- .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
- public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
+ .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+ public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
private final RegionServicesForStores region;
- private LinkedList<ImmutableSegment> pipeline;
- private long version;
+ private LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
+ private LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
+ private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
- this.pipeline = new LinkedList<>();
- this.version = 0;
}
public boolean pushHead(MutableSegment segment) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(segment);
synchronized (pipeline){
- return addFirst(immutableSegment);
+ boolean res = addFirst(immutableSegment);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ return res;
}
}
public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){
- List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
- return new VersionedSegmentsList(segmentList, version);
+ return new VersionedSegmentsList(readOnlyCopy, version);
}
}
@@ -93,8 +106,10 @@ public class CompactionPipeline {
* During index merge op this will be false and for compaction it will be true.
* @return true iff swapped tail with new segment
*/
- public boolean swap(
- VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
+ justification="Increment is done under a synchronize block so safe")
+ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
+ boolean closeSuffix) {
if (versionedList.getVersion() != version) {
return false;
}
@@ -115,6 +130,8 @@ public class CompactionPipeline {
+ ", and the number of cells in new segment is:" + count);
}
swapSuffix(suffix, segment, closeSuffix);
+ readOnlyCopy = new LinkedList<>(pipeline);
+ version++;
}
if (closeSuffix && region != null) {
// update the global memstore size counter
@@ -193,35 +210,34 @@ public class CompactionPipeline {
}
public boolean isEmpty() {
- return pipeline.isEmpty();
+ return readOnlyCopy.isEmpty();
}
- public List<Segment> getSegments() {
- synchronized (pipeline){
- return new LinkedList<>(pipeline);
- }
+ public List<? extends Segment> getSegments() {
+ return readOnlyCopy;
}
public long size() {
- return pipeline.size();
+ return readOnlyCopy.size();
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
- if (!isEmpty()) {
- minSequenceId = pipeline.getLast().getMinSequenceId();
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
+ if (!localCopy.isEmpty()) {
+ minSequenceId = localCopy.getLast().getMinSequenceId();
}
return minSequenceId;
}
public MemstoreSize getTailSize() {
- if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
- return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
+ LinkedList<? extends Segment> localCopy = readOnlyCopy;
+ if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE;
+ return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead());
}
- private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,
+ private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
- version++;
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
http://git-wip-us.apache.org/repos/asf/hbase/blob/15762691/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 6e8f831..ceaadbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -325,6 +325,7 @@ public class TestHeapSize {
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(CompactionPipeline.class, false);
expected += ClassSize.estimateBase(LinkedList.class, false);
+ expected += ClassSize.estimateBase(LinkedList.class, false);
expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
if (expected != actual) {
@@ -333,6 +334,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicBoolean.class, true);
ClassSize.estimateBase(CompactionPipeline.class, true);
ClassSize.estimateBase(LinkedList.class, true);
+ ClassSize.estimateBase(LinkedList.class, true);
ClassSize.estimateBase(MemStoreCompactor.class, true);
ClassSize.estimateBase(AtomicBoolean.class, true);
assertEquals(expected, actual);
[2/2] hbase git commit: HBASE-12148 Remove TimeRangeTracker as point
of contention when many threads writing a Store (Huaxiang Sun)
Posted by st...@apache.org.
HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd1ae371
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd1ae371
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd1ae371
Branch: refs/heads/master
Commit: dd1ae37148c13e79ec37817f3953a79dd40e8e87
Parents: 1576269
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 13:24:53 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 13:24:53 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/ClassSize.java | 3 +-
.../hbase/regionserver/TimeRangeTracker.java | 113 ++++++++++---------
.../regionserver/TestTimeRangeTracker.java | 92 ++++++++++++++-
3 files changed, 154 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index 85a6483..465bd9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -313,8 +313,7 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
- TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
-
+ TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
CELL_SET = align(OBJECT + REFERENCE);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
index 1ea3c70..95c7c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
-
/**
- * Stores minimum and maximum timestamp values.
+ * Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
+ * interval notation.
* Use this class at write-time ONLY. Too much synchronization to use at read time
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
* updates but then later we can make one as part of a compaction when there is only one thread
@@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
public class TimeRangeTracker implements Writable {
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
static final long INITIAL_MAX_TIMESTAMP = -1L;
- long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
- long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
+
+ AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
+ AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
/**
* Default constructor.
@@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
* @param trt source TimeRangeTracker
*/
public TimeRangeTracker(final TimeRangeTracker trt) {
- set(trt.getMin(), trt.getMax());
+ minimumTimestamp.set(trt.getMin());
+ maximumTimestamp.set(trt.getMax());
}
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
- set(minimumTimestamp, maximumTimestamp);
- }
-
- private void set(final long min, final long max) {
- this.minimumTimestamp = min;
- this.maximumTimestamp = max;
- }
-
- /**
- * @param l
- * @return True if we initialized values
- */
- private boolean init(final long l) {
- if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
- set(l, l);
- return true;
+ this.minimumTimestamp.set(minimumTimestamp);
+ this.maximumTimestamp.set(maximumTimestamp);
}
/**
@@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
justification="Intentional")
void includeTimestamp(final long timestamp) {
- // Do test outside of synchronization block. Synchronization in here can be problematic
- // when many threads writing one Store -- they can all pile up trying to add in here.
- // Happens when doing big write upload where we are hammering on one region.
- if (timestamp < this.minimumTimestamp) {
- synchronized (this) {
- if (!init(timestamp)) {
- if (timestamp < this.minimumTimestamp) {
- this.minimumTimestamp = timestamp;
- }
+ long initialMinTimestamp = this.minimumTimestamp.get();
+ if (timestamp < initialMinTimestamp) {
+ long curMinTimestamp = initialMinTimestamp;
+ while (timestamp < curMinTimestamp) {
+ if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
+ curMinTimestamp = this.minimumTimestamp.get();
+ } else {
+ // successfully set minimumTimestamp, break.
+ break;
}
}
- } else if (timestamp > this.maximumTimestamp) {
- synchronized (this) {
- if (!init(timestamp)) {
- if (this.maximumTimestamp < timestamp) {
- this.maximumTimestamp = timestamp;
- }
+
+ // When it reaches here, there are two possibilities:
+ // 1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
+ // it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
+ // if it needs to update minimumTimestamp. Someone may already set both
+ // minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
+ // need to check if maximumTimestamp needs to be updated.
+ // 2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
+ // In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
+ // to see if it needs to set maximumTimestamp.
+ if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
+ // Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
+ // In this case, no need to set maximumTimestamp as it will be set to at least
+ // initialMinTimestamp.
+ return;
+ }
+ }
+
+ long curMaxTimestamp = this.maximumTimestamp.get();
+
+ if (timestamp > curMaxTimestamp) {
+ while (timestamp > curMaxTimestamp) {
+ if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
+ curMaxTimestamp = this.maximumTimestamp.get();
+ } else {
+ // successfully set maximumTimestamp, break
+ break;
}
}
}
@@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
/**
* Check if the range has ANY overlap with TimeRange
- * @param tr TimeRange
+ * @param tr TimeRange, it expects [minStamp, maxStamp)
* @return True if there is overlap, false otherwise
*/
- public synchronized boolean includesTimeRange(final TimeRange tr) {
- return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
+ public boolean includesTimeRange(final TimeRange tr) {
+ return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
}
/**
* @return the minimumTimestamp
*/
- public synchronized long getMin() {
- return minimumTimestamp;
+ public long getMin() {
+ return minimumTimestamp.get();
}
/**
* @return the maximumTimestamp
*/
- public synchronized long getMax() {
- return maximumTimestamp;
+ public long getMax() {
+ return maximumTimestamp.get();
}
- public synchronized void write(final DataOutput out) throws IOException {
- out.writeLong(minimumTimestamp);
- out.writeLong(maximumTimestamp);
+ public void write(final DataOutput out) throws IOException {
+ out.writeLong(minimumTimestamp.get());
+ out.writeLong(maximumTimestamp.get());
}
- public synchronized void readFields(final DataInput in) throws IOException {
- this.minimumTimestamp = in.readLong();
- this.maximumTimestamp = in.readLong();
+ public void readFields(final DataInput in) throws IOException {
+
+ this.minimumTimestamp.set(in.readLong());
+ this.maximumTimestamp.set(in.readLong());
}
@Override
- public synchronized String toString() {
- return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
+ public String toString() {
+ return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
index 2803baf..3ee6114 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
@Category({RegionServerTests.class, SmallTests.class})
public class TestTimeRangeTracker {
+ private static final int NUM_KEYS = 10000000;
+
@Test
public void testExtreme() {
TimeRange tr = new TimeRange();
@@ -119,6 +121,7 @@ public class TestTimeRangeTracker {
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
+
assertTrue(trr.getMax() == calls * threadCount);
assertTrue(trr.getMin() == 0);
}
@@ -156,6 +159,93 @@ public class TestTimeRangeTracker {
assertFalse(twoArgRange3.isAllTime());
}
+ final static int NUM_OF_THREADS = 20;
+
+ class RandomTestData {
+ private long[] keys = new long[NUM_KEYS];
+ private long min = Long.MAX_VALUE;
+ private long max = 0;
+
+ public RandomTestData() {
+ if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
+ for (int i = 0; i < NUM_KEYS; i++) {
+ keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+ if (keys[i] < min) min = keys[i];
+ if (keys[i] > max) max = keys[i];
+ }
+ } else {
+ for (int i = NUM_KEYS - 1; i >= 0; i--) {
+ keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+ if (keys[i] < min) min = keys[i];
+ if (keys[i] > max) max = keys[i];
+ }
+ }
+ }
+
+ public long getMax() {
+ return this.max;
+ }
+
+ public long getMin() {
+ return this.min;
+ }
+ }
+
+ class TrtUpdateRunnable implements Runnable {
+
+ private TimeRangeTracker trt;
+ private RandomTestData data;
+ public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
+ this.trt = trt;
+ this.data = data;
+ }
+
+ public void run() {
+ for (long key : data.keys) {
+ trt.includeTimestamp(key);
+ }
+ }
+ }
+
+ /**
+ * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
+ * at right range. The data chosen is going to ensure that there are lots collisions, i.e,
+ * some other threads may already update the value while one tries to update min/max value.
+ */
+ @Test
+ public void testConcurrentIncludeTimestampCorrectness() {
+ RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
+ long min = Long.MAX_VALUE, max = 0;
+ for (int i = 0; i < NUM_OF_THREADS; i ++) {
+ testData[i] = new RandomTestData();
+ if (testData[i].getMin() < min) {
+ min = testData[i].getMin();
+ }
+ if (testData[i].getMax() > max) {
+ max = testData[i].getMax();
+ }
+ }
+
+ TimeRangeTracker trt = new TimeRangeTracker();
+
+ Thread[] t = new Thread[NUM_OF_THREADS];
+ for (int i = 0; i < NUM_OF_THREADS; i++) {
+ t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
+ t[i].start();
+ }
+
+ for (Thread thread : t) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertTrue(min == trt.getMin());
+ assertTrue(max == trt.getMax());
+ }
+
/**
* Bit of code to test concurrent access on this class.
* @param args