You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/04/15 00:18:32 UTC
svn commit: r934221 - in /hadoop/hbase/branches/0.20_pre_durability/src:
java/org/apache/hadoop/hbase/regionserver/
test/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Wed Apr 14 22:18:32 2010
New Revision: 934221
URL: http://svn.apache.org/viewvc?rev=934221&view=rev
Log:
HBASE-2248 Provide new non-copy mechanism to assure atomic reads in get and scan
Added:
hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
Added: hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+ private static final Object sync = new Object();
+ public static StringBuilder out = new StringBuilder();
+
+ static public void enable() {
+ enabled.set(true);
+ }
+ static public void disable() {
+ enabled.set(false);
+ }
+
+ static public void reset() {
+ synchronized (sync) {
+ enable(); // someone wants us enabled basically.
+
+ out = new StringBuilder();
+ }
+ }
+ static public void dumpToFile(String file) throws IOException {
+ FileWriter f = new FileWriter(file);
+ synchronized (sync) {
+ f.write(out.toString());
+ }
+ f.close();
+ }
+
+ public static void println(String m) {
+ if (!enabled.get()) {
+ System.out.println(m);
+ return;
+ }
+
+ synchronized (sync) {
+ String threadName = Thread.currentThread().getName();
+ out.append("<");
+ out.append(threadName);
+ out.append("> ");
+ out.append(m);
+ out.append("\n");
+ }
+ }
+}
Added: hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+ private final AtomicLong memstoreRead = new AtomicLong();
+ private final AtomicLong memstoreWrite = new AtomicLong();
+ // This is the pending queue of writes.
+ private final LinkedList<WriteEntry> writeQueue =
+ new LinkedList<WriteEntry>();
+
+ private static final ThreadLocal<Long> perThreadReadPoint =
+ new ThreadLocal<Long>();
+
+ public static long getThreadReadPoint() {
+ return perThreadReadPoint.get();
+ }
+
+ public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+ perThreadReadPoint.set(rwcc.memstoreReadPoint());
+ return getThreadReadPoint();
+ }
+
+ public WriteEntry beginMemstoreInsert() {
+ synchronized (writeQueue) {
+ long nextWriteNumber = memstoreWrite.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
+ writeQueue.add(e);
+ return e;
+ }
+ }
+ public void completeMemstoreInsert(WriteEntry e) {
+ synchronized (writeQueue) {
+ e.markCompleted();
+
+ long nextReadValue = -1;
+ boolean ranOnce=false;
+ while (!writeQueue.isEmpty()) {
+ ranOnce=true;
+ WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ + nextReadValue + " next: " + queueFirst.getWriteNumber());
+ }
+ }
+
+ if (queueFirst.isCompleted()) {
+ nextReadValue = queueFirst.getWriteNumber();
+ writeQueue.removeFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (!ranOnce) {
+ throw new RuntimeException("never was a first");
+ }
+
+ if (nextReadValue > 0) {
+ memstoreRead.set(nextReadValue);
+ }
+ }
+
+ // Spin until any other concurrent puts have finished. This makes sure that
+ // if we move on to construct a scanner, we'll get read-your-own-writes
+ // consistency. We anticipate that since puts to the memstore are very fast,
+ // this will be on the order of microseconds - so spinning should be faster
+ // than a condition variable.
+ int spun = 0;
+ while (memstoreRead.get() < e.getWriteNumber()) {
+ spun++;
+ }
+ // Could potentially expose spun as a metric
+ }
+
+ public long memstoreReadPoint() {
+ return memstoreRead.get();
+ }
+
+
+ public static class WriteEntry {
+ private long writeNumber;
+ private boolean completed = false;
+ WriteEntry(long writeNumber) {
+ this.writeNumber = writeNumber;
+ }
+ void markCompleted() {
+ this.completed = true;
+ }
+ boolean isCompleted() {
+ return this.completed;
+ }
+ long getWriteNumber() {
+ return this.writeNumber;
+ }
+ }
+}
Added: hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestReadWriteConsistencyControl extends TestCase {
+ static class Writer implements Runnable {
+ final AtomicBoolean finished;
+ final ReadWriteConsistencyControl rwcc;
+ final AtomicBoolean status;
+
+ Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+ this.finished = finished;
+ this.rwcc = rwcc;
+ this.status = status;
+ }
+ private Random rnd = new Random();
+ public boolean failed = false;
+
+ public void run() {
+ while (!finished.get()) {
+ ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+// System.out.println("Begin write: " + e.getWriteNumber());
+ // 10 usec - 500usec (including 0)
+ int sleepTime = rnd.nextInt(500);
+ // 500 * 1000 = 500,000ns = 500 usec
+ // 1 * 100 = 100ns = 1usec
+ try {
+ if (sleepTime > 0)
+ Thread.sleep(0, sleepTime * 1000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ rwcc.completeMemstoreInsert(e);
+ } catch (RuntimeException ex) {
+ // got failure
+ System.out.println(ex.toString());
+ ex.printStackTrace();
+ status.set(false);
+ return;
+ // Report failure if possible.
+ }
+ }
+ }
+ }
+
+ public void testParallelism() throws Exception {
+ final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+
+ // fail flag for the reader thread
+ final AtomicBoolean readerFailed = new AtomicBoolean(false);
+ final AtomicLong failedAt = new AtomicLong();
+ Runnable reader = new Runnable() {
+ public void run() {
+ long prev = rwcc.memstoreReadPoint();
+ while (!finished.get()) {
+ long newPrev = rwcc.memstoreReadPoint();
+ if (newPrev < prev) {
+ // serious problem.
+ System.out.println("Reader got out of order, prev: " +
+ prev + " next was: " + newPrev);
+ readerFailed.set(true);
+ // might as well give up
+ failedAt.set(newPrev);
+ return;
+ }
+ }
+ }
+ };
+
+ // writer thread parallelism.
+ int n = 20;
+ Thread [] writers = new Thread[n];
+ AtomicBoolean [] statuses = new AtomicBoolean[n];
+ Thread readThread = new Thread(reader);
+
+ for (int i = 0 ; i < n ; ++i ) {
+ statuses[i] = new AtomicBoolean(true);
+ writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
+ writers[i].start();
+ }
+ readThread.start();
+
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ }
+
+ finished.set(true);
+
+ readThread.join();
+ for (int i = 0; i < n; ++i) {
+ writers[i].join();
+ }
+
+ // check failure.
+ assertFalse(readerFailed.get());
+ for (int i = 0; i < n; ++i) {
+ assertTrue(statuses[i].get());
+ }
+
+
+ }
+}