You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/04/15 00:18:32 UTC

svn commit: r934221 - in /hadoop/hbase/branches/0.20_pre_durability/src: java/org/apache/hadoop/hbase/regionserver/ test/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Wed Apr 14 22:18:32 2010
New Revision: 934221

URL: http://svn.apache.org/viewvc?rev=934221&view=rev
Log:
HBASE-2248 Provide new non-copy mechanism to assure atomic reads in get and scan

Added:
    hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
    hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
    hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java

Added: hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+  private static final Object sync = new Object();
+  public static StringBuilder out = new StringBuilder();
+
+  static public void enable() {
+    enabled.set(true);
+  }
+  static public void disable() {
+    enabled.set(false);
+  }
+
+  static public void reset() {
+    synchronized (sync) {
+      enable(); // someone wants us enabled basically.
+
+      out = new StringBuilder();
+    }
+  }
+  static public void dumpToFile(String file) throws IOException {
+    FileWriter f = new FileWriter(file);
+    synchronized (sync) {
+      f.write(out.toString());
+    }
+    f.close();
+  }
+
+  public static void println(String m) {
+    if (!enabled.get()) {
+      System.out.println(m);
+      return;
+    }
+    
+    synchronized (sync) {
+      String threadName = Thread.currentThread().getName();
+      out.append("<");
+      out.append(threadName);
+      out.append("> ");
+      out.append(m);
+      out.append("\n");
+    }
+  }
+}

Added: hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+  private final AtomicLong memstoreRead = new AtomicLong();
+  private final AtomicLong memstoreWrite = new AtomicLong();
+  // This is the pending queue of writes.
+  private final LinkedList<WriteEntry> writeQueue =
+      new LinkedList<WriteEntry>();
+
+  private static final ThreadLocal<Long> perThreadReadPoint =
+      new ThreadLocal<Long>();
+
+  public static long getThreadReadPoint() {
+    return perThreadReadPoint.get();
+  }
+  
+  public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+    perThreadReadPoint.set(rwcc.memstoreReadPoint());
+    return getThreadReadPoint();
+  }
+
+  public WriteEntry beginMemstoreInsert() {
+    synchronized (writeQueue) {
+      long nextWriteNumber = memstoreWrite.incrementAndGet();
+      WriteEntry e = new WriteEntry(nextWriteNumber);
+      writeQueue.add(e);
+      return e;
+    }
+  }
+  public void completeMemstoreInsert(WriteEntry e) {
+    synchronized (writeQueue) {
+      e.markCompleted();
+
+      long nextReadValue = -1;
+      boolean ranOnce=false;
+      while (!writeQueue.isEmpty()) {
+        ranOnce=true;
+        WriteEntry queueFirst = writeQueue.getFirst();
+
+        if (nextReadValue > 0) {
+          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+                + nextReadValue + " next: " + queueFirst.getWriteNumber());
+          }
+        }
+
+        if (queueFirst.isCompleted()) {
+          nextReadValue = queueFirst.getWriteNumber();
+          writeQueue.removeFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (!ranOnce) {
+        throw new RuntimeException("never was a first");
+      }
+
+      if (nextReadValue > 0) {
+        memstoreRead.set(nextReadValue);
+      }
+    }
+
+    // Spin until any other concurrent puts have finished. This makes sure that
+    // if we move on to construct a scanner, we'll get read-your-own-writes
+    // consistency. We anticipate that since puts to the memstore are very fast,
+    // this will be on the order of microseconds - so spinning should be faster
+    // than a condition variable.
+    int spun = 0;
+    while (memstoreRead.get() < e.getWriteNumber()) {
+      spun++;
+    }
+    // Could potentially expose spun as a metric
+  }
+
+  public long memstoreReadPoint() {
+    return memstoreRead.get();
+  }
+
+
+  public static class WriteEntry {
+    private long writeNumber;
+    private boolean completed = false;
+    WriteEntry(long writeNumber) {
+      this.writeNumber = writeNumber;
+    }
+    void markCompleted() {
+      this.completed = true;
+    }
+    boolean isCompleted() {
+      return this.completed;
+    }
+    long getWriteNumber() {
+      return this.writeNumber;
+    }
+  }
+}

Added: hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java?rev=934221&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java Wed Apr 14 22:18:32 2010
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestReadWriteConsistencyControl extends TestCase {
+  static class Writer implements Runnable {
+    final AtomicBoolean finished;
+    final ReadWriteConsistencyControl rwcc;
+    final AtomicBoolean status;
+
+    Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+      this.finished = finished;
+      this.rwcc = rwcc;
+      this.status = status;
+    }
+    private Random rnd = new Random();
+    public boolean failed = false;
+
+    public void run() {
+      while (!finished.get()) {
+        ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+//        System.out.println("Begin write: " + e.getWriteNumber());
+        // 10 usec - 500usec (including 0)
+        int sleepTime = rnd.nextInt(500);
+        // 500 * 1000 = 500,000ns = 500 usec
+        // 1 * 100 = 100ns = 1usec
+        try {
+          if (sleepTime > 0)
+            Thread.sleep(0, sleepTime * 1000);
+        } catch (InterruptedException e1) {
+        }
+        try {
+          rwcc.completeMemstoreInsert(e);
+        } catch (RuntimeException ex) {
+          // got failure
+          System.out.println(ex.toString());
+          ex.printStackTrace();
+          status.set(false);
+          return;
+          // Report failure if possible.
+        }
+      }
+    }
+  }
+
+  public void testParallelism() throws Exception {
+    final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+
+    final AtomicBoolean finished = new AtomicBoolean(false);
+
+    // fail flag for the reader thread
+    final AtomicBoolean readerFailed = new AtomicBoolean(false);
+    final AtomicLong failedAt = new AtomicLong();
+    Runnable reader = new Runnable() {
+      public void run() {
+        long prev = rwcc.memstoreReadPoint();
+        while (!finished.get()) {
+          long newPrev = rwcc.memstoreReadPoint();
+          if (newPrev < prev) {
+            // serious problem.
+            System.out.println("Reader got out of order, prev: " +
+            prev + " next was: " + newPrev);
+            readerFailed.set(true);
+            // might as well give up
+            failedAt.set(newPrev);
+            return;
+          }
+        }
+      }
+    };
+
+    // writer thread parallelism.
+    int n = 20;
+    Thread [] writers = new Thread[n];
+    AtomicBoolean [] statuses = new AtomicBoolean[n];
+    Thread readThread = new Thread(reader);
+
+    for (int i = 0 ; i < n ; ++i ) {
+      statuses[i] = new AtomicBoolean(true);
+      writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
+      writers[i].start();
+    }
+    readThread.start();
+
+    try {
+      Thread.sleep(10 * 1000);
+    } catch (InterruptedException ex) {
+    }
+
+    finished.set(true);
+
+    readThread.join();
+    for (int i = 0; i < n; ++i) {
+      writers[i].join();
+    }
+
+    // check failure.
+    assertFalse(readerFailed.get());
+    for (int i = 0; i < n; ++i) {
+      assertTrue(statuses[i].get());
+    }
+
+
+  }
+}