You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/09 21:25:07 UTC

[2/2] hbase git commit: HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)

HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd1ae371
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd1ae371
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd1ae371

Branch: refs/heads/master
Commit: dd1ae37148c13e79ec37817f3953a79dd40e8e87
Parents: 1576269
Author: Michael Stack <st...@apache.org>
Authored: Mon Jan 9 13:24:53 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Jan 9 13:24:53 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/ClassSize.java |   3 +-
 .../hbase/regionserver/TimeRangeTracker.java    | 113 ++++++++++---------
 .../regionserver/TestTimeRangeTracker.java      |  92 ++++++++++++++-
 3 files changed, 154 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index 85a6483..465bd9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -313,8 +313,7 @@ public class ClassSize {
 
     TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
 
-    TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
-
+    TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
     CELL_SET = align(OBJECT + REFERENCE);
 
     STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
index 1ea3c70..95c7c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
-
 /**
- * Stores minimum and maximum timestamp values.
+ * Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
+ * interval notation.
  * Use this class at write-time ONLY. Too much synchronization to use at read time
  * (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
  * updates but then later we can make one as part of a compaction when there is only one thread
@@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
 public class TimeRangeTracker implements Writable {
   static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
   static final long INITIAL_MAX_TIMESTAMP = -1L;
-  long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
-  long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
+
+  AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
+  AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
 
   /**
    * Default constructor.
@@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
    * @param trt source TimeRangeTracker
    */
   public TimeRangeTracker(final TimeRangeTracker trt) {
-    set(trt.getMin(), trt.getMax());
+    minimumTimestamp.set(trt.getMin());
+    maximumTimestamp.set(trt.getMax());
   }
 
   public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
-    set(minimumTimestamp, maximumTimestamp);
-  }
-
-  private void set(final long min, final long max) {
-    this.minimumTimestamp = min;
-    this.maximumTimestamp = max;
-  }
-
-  /**
-   * @param l
-   * @return True if we initialized values
-   */
-  private boolean init(final long l) {
-    if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
-    set(l, l);
-    return true;
+    this.minimumTimestamp.set(minimumTimestamp);
+    this.maximumTimestamp.set(maximumTimestamp);
   }
 
   /**
@@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
       justification="Intentional")
   void includeTimestamp(final long timestamp) {
-    // Do test outside of synchronization block.  Synchronization in here can be problematic
-    // when many threads writing one Store -- they can all pile up trying to add in here.
-    // Happens when doing big write upload where we are hammering on one region.
-    if (timestamp < this.minimumTimestamp) {
-      synchronized (this) {
-        if (!init(timestamp)) {
-          if (timestamp < this.minimumTimestamp) {
-            this.minimumTimestamp = timestamp;
-          }
+    long initialMinTimestamp = this.minimumTimestamp.get();
+    if (timestamp < initialMinTimestamp) {
+      long curMinTimestamp = initialMinTimestamp;
+      while (timestamp < curMinTimestamp) {
+        if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
+          curMinTimestamp = this.minimumTimestamp.get();
+        } else {
+          // successfully set minimumTimestamp, break.
+          break;
         }
       }
-    } else if (timestamp > this.maximumTimestamp) {
-      synchronized (this) {
-        if (!init(timestamp)) {
-          if (this.maximumTimestamp < timestamp) {
-            this.maximumTimestamp =  timestamp;
-          }
+
+      // When it reaches here, there are two possibilities:
+      //  1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
+      //      it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
+      //      if it needs to update minimumTimestamp. Someone may already set both
+      //      minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
+      //      need to check if maximumTimestamp needs to be updated.
+      //  2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
+      //      In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
+      //      to see if it needs to set maximumTimestamp.
+      if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
+        // Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
+        // In this case, no need to set maximumTimestamp as it will be set to at least
+        // initialMinTimestamp.
+        return;
+      }
+    }
+
+    long curMaxTimestamp = this.maximumTimestamp.get();
+
+    if (timestamp > curMaxTimestamp) {
+      while (timestamp > curMaxTimestamp) {
+        if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
+          curMaxTimestamp = this.maximumTimestamp.get();
+        } else {
+          // successfully set maximumTimestamp, break
+          break;
         }
       }
     }
@@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
 
   /**
    * Check if the range has ANY overlap with TimeRange
-   * @param tr TimeRange
+   * @param tr TimeRange, it expects [minStamp, maxStamp)
    * @return True if there is overlap, false otherwise
    */
-  public synchronized boolean includesTimeRange(final TimeRange tr) {
-    return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
+  public boolean includesTimeRange(final TimeRange tr) {
+    return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
   }
 
   /**
    * @return the minimumTimestamp
    */
-  public synchronized long getMin() {
-    return minimumTimestamp;
+  public long getMin() {
+    return minimumTimestamp.get();
   }
 
   /**
    * @return the maximumTimestamp
    */
-  public synchronized long getMax() {
-    return maximumTimestamp;
+  public long getMax() {
+    return maximumTimestamp.get();
   }
 
-  public synchronized void write(final DataOutput out) throws IOException {
-    out.writeLong(minimumTimestamp);
-    out.writeLong(maximumTimestamp);
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(minimumTimestamp.get());
+    out.writeLong(maximumTimestamp.get());
   }
 
-  public synchronized void readFields(final DataInput in) throws IOException {
-    this.minimumTimestamp = in.readLong();
-    this.maximumTimestamp = in.readLong();
+  public void readFields(final DataInput in) throws IOException {
+
+    this.minimumTimestamp.set(in.readLong());
+    this.maximumTimestamp.set(in.readLong());
   }
 
   @Override
-  public synchronized String toString() {
-    return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
+  public String toString() {
+    return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/dd1ae371/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
index 2803baf..3ee6114 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
 
 @Category({RegionServerTests.class, SmallTests.class})
 public class TestTimeRangeTracker {
+  private static final int NUM_KEYS = 10000000;
+
   @Test
   public void testExtreme() {
     TimeRange tr = new TimeRange();
@@ -119,6 +121,7 @@ public class TestTimeRangeTracker {
     for (int i = 0; i < threads.length; i++) {
       threads[i].join();
     }
+
     assertTrue(trr.getMax() == calls * threadCount);
     assertTrue(trr.getMin() == 0);
   }
@@ -156,6 +159,93 @@ public class TestTimeRangeTracker {
     assertFalse(twoArgRange3.isAllTime());
   }
 
+  final static int NUM_OF_THREADS = 20;
+
+  class RandomTestData {
+    private long[] keys = new long[NUM_KEYS];
+    private long min = Long.MAX_VALUE;
+    private long max = 0;
+
+    public RandomTestData() {
+      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
+        for (int i = 0; i < NUM_KEYS; i++) {
+          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+          if (keys[i] < min) min = keys[i];
+          if (keys[i] > max) max = keys[i];
+        }
+      } else {
+        for (int i = NUM_KEYS - 1; i >= 0; i--) {
+          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
+          if (keys[i] < min) min = keys[i];
+          if (keys[i] > max) max = keys[i];
+        }
+      }
+    }
+
+    public long getMax() {
+      return this.max;
+    }
+
+    public long getMin() {
+      return this.min;
+    }
+  }
+
+  class TrtUpdateRunnable implements Runnable {
+
+    private TimeRangeTracker trt;
+    private RandomTestData data;
+    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
+      this.trt = trt;
+      this.data = data;
+    }
+
+    public void run() {
+      for (long key : data.keys) {
+        trt.includeTimestamp(key);
+      }
+    }
+  }
+
+  /**
+   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
+   * at right range.  The data chosen is going to ensure that there are lots collisions, i.e,
+   * some other threads may already update the value while one tries to update min/max value.
+   */
+  @Test
+  public void testConcurrentIncludeTimestampCorrectness() {
+    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
+    long min = Long.MAX_VALUE, max = 0;
+    for (int i = 0; i < NUM_OF_THREADS; i ++) {
+      testData[i] = new RandomTestData();
+      if (testData[i].getMin() < min) {
+        min = testData[i].getMin();
+      }
+      if (testData[i].getMax() > max) {
+        max = testData[i].getMax();
+      }
+    }
+
+    TimeRangeTracker trt = new TimeRangeTracker();
+
+    Thread[] t = new Thread[NUM_OF_THREADS];
+    for (int i = 0; i < NUM_OF_THREADS; i++) {
+      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
+      t[i].start();
+    }
+
+    for (Thread thread : t) {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    assertTrue(min == trt.getMin());
+    assertTrue(max == trt.getMax());
+  }
+
   /**
    * Bit of code to test concurrent access on this class.
    * @param args