You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/09 21:35:34 UTC
hbase git commit: HBASE-12148 Remove TimeRangeTracker as point of
contention when many threads writing a Store (Huaxiang Sun)
Repository: hbase
Updated Branches:
refs/heads/branch-1 9b26c9ff3 -> 6130ea4d5
HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6130ea4d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6130ea4d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6130ea4d
Branch: refs/heads/branch-1
Commit: 6130ea4d54d1a904de220ebad57ddea7b45797f0
Parents: 9b26c9f
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 13:24:53 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 13:35:20 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/ClassSize.java | 2 +-
.../hbase/regionserver/TimeRangeTracker.java | 113 ++++++++++---------
.../regionserver/TestTimeRangeTracker.java | 91 +++++++++++++++
3 files changed, 154 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6130ea4d/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index c0b3128..92fde2c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -278,7 +278,7 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
- TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
+ TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
CELL_SKIPLIST_SET = align(OBJECT + REFERENCE);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6130ea4d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
index 78950f8..9b2a56a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
-
/**
- * Stores minimum and maximum timestamp values.
+ * Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
+ * interval notation.
* Use this class at write-time ONLY. Too much synchronization to use at read time
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
* updates but then later we can make one as part of a compaction when there is only one thread
@@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
public class TimeRangeTracker implements Writable {
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
static final long INITIAL_MAX_TIMESTAMP = -1L;
- long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
- long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
+
+ AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
+ AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
/**
* Default constructor.
@@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
* @param trt source TimeRangeTracker
*/
public TimeRangeTracker(final TimeRangeTracker trt) {
- set(trt.getMin(), trt.getMax());
+ minimumTimestamp.set(trt.getMin());
+ maximumTimestamp.set(trt.getMax());
}
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
- set(minimumTimestamp, maximumTimestamp);
- }
-
- private void set(final long min, final long max) {
- this.minimumTimestamp = min;
- this.maximumTimestamp = max;
- }
-
- /**
- * @param l
- * @return True if we initialized values
- */
- private boolean init(final long l) {
- if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
- set(l, l);
- return true;
+ this.minimumTimestamp.set(minimumTimestamp);
+ this.maximumTimestamp.set(maximumTimestamp);
}
/**
@@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
justification="Intentional")
void includeTimestamp(final long timestamp) {
- // Do test outside of synchronization block. Synchronization in here can be problematic
- // when many threads writing one Store -- they can all pile up trying to add in here.
- // Happens when doing big write upload where we are hammering on one region.
- if (timestamp < this.minimumTimestamp) {
- synchronized (this) {
- if (!init(timestamp)) {
- if (timestamp < this.minimumTimestamp) {
- this.minimumTimestamp = timestamp;
- }
+ long initialMinTimestamp = this.minimumTimestamp.get();
+ if (timestamp < initialMinTimestamp) {
+ long curMinTimestamp = initialMinTimestamp;
+ while (timestamp < curMinTimestamp) {
+ if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
+ curMinTimestamp = this.minimumTimestamp.get();
+ } else {
+ // successfully set minimumTimestamp, break.
+ break;
}
}
- } else if (timestamp > this.maximumTimestamp) {
- synchronized (this) {
- if (!init(timestamp)) {
- if (this.maximumTimestamp < timestamp) {
- this.maximumTimestamp = timestamp;
- }
+
+ // When it reaches here, there are two possibilities:
+ // 1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
+ // it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
+ // if it needs to update minimumTimestamp. Someone may already set both
+ // minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
+ // need to check if maximumTimestamp needs to be updated.
+ // 2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
+ // In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
+ // to see if it needs to set maximumTimestamp.
+ if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
+ // Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
+ // In this case, no need to set maximumTimestamp as it will be set to at least
+ // initialMinTimestamp.
+ return;
+ }
+ }
+
+ long curMaxTimestamp = this.maximumTimestamp.get();
+
+ if (timestamp > curMaxTimestamp) {
+ while (timestamp > curMaxTimestamp) {
+ if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
+ curMaxTimestamp = this.maximumTimestamp.get();
+ } else {
+ // successfully set maximumTimestamp, break
+ break;
}
}
}
@@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
/**
* Check if the range has ANY overlap with TimeRange
- * @param tr TimeRange
+ * @param tr TimeRange, it expects [minStamp, maxStamp)
* @return True if there is overlap, false otherwise
*/
- public synchronized boolean includesTimeRange(final TimeRange tr) {
- return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
+ public boolean includesTimeRange(final TimeRange tr) {
+ return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
}
/**
* @return the minimumTimestamp
*/
- public synchronized long getMin() {
- return minimumTimestamp;
+ public long getMin() {
+ return minimumTimestamp.get();
}
/**
* @return the maximumTimestamp
*/
- public synchronized long getMax() {
- return maximumTimestamp;
+ public long getMax() {
+ return maximumTimestamp.get();
}
- public synchronized void write(final DataOutput out) throws IOException {
- out.writeLong(minimumTimestamp);
- out.writeLong(maximumTimestamp);
+ public void write(final DataOutput out) throws IOException {
+ out.writeLong(minimumTimestamp.get());
+ out.writeLong(maximumTimestamp.get());
}
- public synchronized void readFields(final DataInput in) throws IOException {
- this.minimumTimestamp = in.readLong();
- this.maximumTimestamp = in.readLong();
+ public void readFields(final DataInput in) throws IOException {
+
+ this.minimumTimestamp.set(in.readLong());
+ this.maximumTimestamp.set(in.readLong());
}
@Override
- public synchronized String toString() {
- return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
+ public String toString() {
+ return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/6130ea4d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
index 377801c..4e61067 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
@@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Writables;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.concurrent.ThreadLocalRandom;
@Category({SmallTests.class})
public class TestTimeRangeTracker {
+ private static final int NUM_KEYS = 10000000;
+
@Test
public void testExtreme() {
TimeRange tr = new TimeRange();
@@ -117,6 +120,7 @@ public class TestTimeRangeTracker {
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
+
assertTrue(trr.getMax() == calls * threadCount);
assertTrue(trr.getMin() == 0);
}
@@ -154,6 +158,93 @@ public class TestTimeRangeTracker {
assertFalse(twoArgRange3.isAllTime());
}
+ final static int NUM_OF_THREADS = 20;
+
+ class RandomTestData {
+ private long[] keys = new long[NUM_KEYS];
+ private long min = Long.MAX_VALUE;
+ private long max = 0;
+
+ public RandomTestData() {
+ if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
+ for (int i = 0; i < NUM_KEYS; i++) {
+ keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+ if (keys[i] < min) min = keys[i];
+ if (keys[i] > max) max = keys[i];
+ }
+ } else {
+ for (int i = NUM_KEYS - 1; i >= 0; i--) {
+ keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+ if (keys[i] < min) min = keys[i];
+ if (keys[i] > max) max = keys[i];
+ }
+ }
+ }
+
+ public long getMax() {
+ return this.max;
+ }
+
+ public long getMin() {
+ return this.min;
+ }
+ }
+
+ class TrtUpdateRunnable implements Runnable {
+
+ private TimeRangeTracker trt;
+ private RandomTestData data;
+ public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
+ this.trt = trt;
+ this.data = data;
+ }
+
+ public void run() {
+ for (long key : data.keys) {
+ trt.includeTimestamp(key);
+ }
+ }
+ }
+
+ /**
+ * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
+ * at right range. The data chosen is going to ensure that there are lots collisions, i.e,
+ * some other threads may already update the value while one tries to update min/max value.
+ */
+ @Test
+ public void testConcurrentIncludeTimestampCorrectness() {
+ RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
+ long min = Long.MAX_VALUE, max = 0;
+ for (int i = 0; i < NUM_OF_THREADS; i ++) {
+ testData[i] = new RandomTestData();
+ if (testData[i].getMin() < min) {
+ min = testData[i].getMin();
+ }
+ if (testData[i].getMax() > max) {
+ max = testData[i].getMax();
+ }
+ }
+
+ TimeRangeTracker trt = new TimeRangeTracker();
+
+ Thread[] t = new Thread[NUM_OF_THREADS];
+ for (int i = 0; i < NUM_OF_THREADS; i++) {
+ t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
+ t[i].start();
+ }
+
+ for (Thread thread : t) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertTrue(min == trt.getMin());
+ assertTrue(max == trt.getMax());
+ }
+
/**
* Bit of code to test concurrent access on this class.
* @param args