You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/05/15 02:18:37 UTC

svn commit: r944529 [2/2] - in /hadoop/hbase/trunk/core/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/had...

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sat May 15 00:18:37 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
 
@@ -47,10 +48,12 @@ public class TestMemStore extends TestCa
   private static final byte [] CONTENTS = Bytes.toBytes("contents");
   private static final byte [] BASIC = Bytes.toBytes("basic");
   private static final String CONTENTSTR = "contentstr";
+  private ReadWriteConsistencyControl rwcc;
 
   @Override
   public void setUp() throws Exception {
     super.setUp();
+    this.rwcc = new ReadWriteConsistencyControl();
     this.memstore = new MemStore();
   }
 
@@ -76,6 +79,7 @@ public class TestMemStore extends TestCa
     KeyValueScanner [] memstorescanners = this.memstore.getScanners();
     Scan scan = new Scan();
     List<KeyValue> result = new ArrayList<KeyValue>();
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
       this.memstore.comparator, null, memstorescanners);
     int count = 0;
@@ -94,6 +98,8 @@ public class TestMemStore extends TestCa
     for (int i = 0; i < memstorescanners.length; i++) {
       memstorescanners[0].close();
     }
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     memstorescanners = this.memstore.getScanners();
     // Now assert can count same number even if a snapshot mid-scan.
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -138,9 +144,9 @@ public class TestMemStore extends TestCa
         if (count == snapshotIndex) {
           this.memstore.snapshot();
           this.memstore.clearSnapshot(this.memstore.getSnapshot());
-          // Added more rows into kvset.
+          // Added more rows into kvset.  But the scanner wont see these rows.
           addRows(this.memstore, ts);
-          LOG.info("Snapshotted, cleared it and then added values");
+          LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
         }
         result.clear();
       }
@@ -151,6 +157,181 @@ public class TestMemStore extends TestCa
   }
 
   /**
+   * A simple test which verifies the 3 possible states when scanning across snapshot.
+   */
+  public void testScanAcrossSnapshot2() {
+    // we are going to the scanning across snapshot with two kvs
+    // kv1 should always be returned before kv2
+    final byte[] one = Bytes.toBytes(1);
+    final byte[] two = Bytes.toBytes(2);
+    final byte[] f = Bytes.toBytes("f");
+    final byte[] q = Bytes.toBytes("q");
+    final byte[] v = Bytes.toBytes(3);
+
+    final KeyValue kv1 = new KeyValue(one, f, q, v);
+    final KeyValue kv2 = new KeyValue(two, f, q, v);
+
+    // use case 1: both kvs in kvset
+    this.memstore.add(kv1.clone());
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 2: both kvs in snapshot
+    this.memstore.snapshot();
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 3: first in snapshot second in kvset
+    this.memstore = new MemStore();
+    this.memstore.add(kv1.clone());
+    this.memstore.snapshot();
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1, kv2);
+  }
+
+  private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    KeyValueScanner[] memstorescanners = this.memstore.getScanners();
+    assertEquals(1, memstorescanners.length);
+    final KeyValueScanner scanner = memstorescanners[0];
+    scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
+    assertEquals(kv1, scanner.next());
+    assertEquals(kv2, scanner.next());
+    assertNull(scanner.next());
+  }
+
+  private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
+    scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
+    for (KeyValue kv : expected) {
+      assertTrue(0 ==
+          KeyValue.COMPARATOR.compare(kv,
+              scanner.next()));
+    }
+    assertNull(scanner.peek());
+  }
+
+  public void testMemstoreConcurrentControl() {
+    final byte[] row = Bytes.toBytes(1);
+    final byte[] f = Bytes.toBytes("family");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] v = Bytes.toBytes("value");
+
+    ReadWriteConsistencyControl.WriteEntry w =
+        rwcc.beginMemstoreInsert();
+
+    KeyValue kv1 = new KeyValue(row, f, q1, v);
+    kv1.setMemstoreTS(w.getWriteNumber());
+    memstore.add(kv1);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    KeyValueScanner[] s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{});
+
+    rwcc.completeMemstoreInsert(w);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1});
+
+    w = rwcc.beginMemstoreInsert();
+    KeyValue kv2 = new KeyValue(row, f, q2, v);
+    kv2.setMemstoreTS(w.getWriteNumber());
+    memstore.add(kv2);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1});
+
+    rwcc.completeMemstoreInsert(w);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1, kv2});
+  }
+
+  private static class ReadOwnWritesTester extends Thread {
+    final int id;
+    static final int NUM_TRIES = 1000;
+
+    final byte[] row;
+
+    final byte[] f = Bytes.toBytes("family");
+    final byte[] q1 = Bytes.toBytes("q1");
+
+    final ReadWriteConsistencyControl rwcc;
+    final MemStore memstore;
+
+    AtomicReference<Throwable> caughtException;
+
+
+    public ReadOwnWritesTester(int id,
+                               MemStore memstore,
+                               ReadWriteConsistencyControl rwcc,
+                               AtomicReference<Throwable> caughtException)
+    {
+      this.id = id;
+      this.rwcc = rwcc;
+      this.memstore = memstore;
+      this.caughtException = caughtException;
+      row = Bytes.toBytes(id);
+    }
+
+    public void run() {
+      try {
+        internalRun();
+      } catch (Throwable t) {
+        caughtException.compareAndSet(null, t);
+      }
+    }
+
+    private void internalRun() {
+      for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
+        ReadWriteConsistencyControl.WriteEntry w =
+          rwcc.beginMemstoreInsert();
+
+        // Insert the sequence value (i)
+        byte[] v = Bytes.toBytes(i);
+
+        KeyValue kv = new KeyValue(row, f, q1, i, v);
+        kv.setMemstoreTS(w.getWriteNumber());
+        memstore.add(kv);
+        rwcc.completeMemstoreInsert(w);
+
+        // Assert that we can read back
+
+        KeyValueScanner s = this.memstore.getScanners()[0];
+        s.seek(kv);
+
+        KeyValue ret = s.next();
+        assertNotNull("Didnt find own write at all", ret);
+        assertEquals("Didnt read own writes",
+                     kv.getTimestamp(), ret.getTimestamp());
+      }
+    }
+  }
+
+  public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
+
+    int NUM_THREADS = 8;
+
+    ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS];
+    AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught);
+      threads[i].start();
+    }
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i].join();
+    }
+
+    if (caught.get() != null) {
+      throw caught.get();
+    }
+  }
+
+  /** 
    * Test memstore snapshots
    * @throws IOException
    */
@@ -443,9 +624,10 @@ public class TestMemStore extends TestCa
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(put3);
     expected.add(del2);
+    expected.add(put2);
     expected.add(put1);
 
-    assertEquals(3, memstore.kvset.size());
+    assertEquals(4, memstore.kvset.size());
     int i = 0;
     for(KeyValue kv : memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -477,8 +659,11 @@ public class TestMemStore extends TestCa
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(put3);
     expected.add(del2);
+    expected.add(put2);
+    expected.add(put1);
 
-    assertEquals(2, memstore.kvset.size());
+    
+    assertEquals(4, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -511,9 +696,14 @@ public class TestMemStore extends TestCa
 
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(del);
+    expected.add(put1);
+    expected.add(put2);
     expected.add(put4);
+    expected.add(put3);
 
-    assertEquals(2, memstore.kvset.size());
+
+    
+    assertEquals(5, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -529,7 +719,7 @@ public class TestMemStore extends TestCa
     memstore.add(new KeyValue(row, fam, qf, ts, val));
     KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
     memstore.delete(delete);
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
 
@@ -542,7 +732,7 @@ public class TestMemStore extends TestCa
         "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
   public void testRetainsDeleteColumn() throws IOException {
@@ -554,7 +744,7 @@ public class TestMemStore extends TestCa
         KeyValue.Type.DeleteColumn, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
   public void testRetainsDeleteFamily() throws IOException {
@@ -566,21 +756,21 @@ public class TestMemStore extends TestCa
         KeyValue.Type.DeleteFamily, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
 
 
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
-  //////////////////////////////////////////////////////////////////////////////
-  private byte [] makeQualifier(final int i1, final int i2){
+  //////////////////////////////////////////////////////////////////////////////  
+  private static byte [] makeQualifier(final int i1, final int i2){
     return Bytes.toBytes(Integer.toString(i1) + ";" +
         Integer.toString(i2));
   }
 
   /**
-   * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+   * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
    * @param hmc Instance to add rows to.
    * @return How many rows we added.
    * @throws IOException
@@ -590,7 +780,7 @@ public class TestMemStore extends TestCa
   }
 
   /**
-   * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+   * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
    * @param hmc Instance to add rows to.
    * @return How many rows we added.
    * @throws IOException
@@ -644,4 +834,57 @@ public class TestMemStore extends TestCa
     return new KeyValue(row, Bytes.toBytes("test_col"), null,
       HConstants.LATEST_TIMESTAMP, value);
   }
+  private static void addRows(int count, final MemStore mem) {
+    long nanos = System.nanoTime();
+
+    for (int i = 0 ; i < count ; i++) {
+      if (i % 1000 == 0) {
+
+        System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000);
+        nanos = System.nanoTime();
+      }
+      long timestamp = System.currentTimeMillis();
+
+      for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
+        byte [] row = Bytes.toBytes(i);
+        byte [] qf = makeQualifier(i, ii);
+        mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+      }
+    }
+  }
+
+
+  static void doScan(MemStore ms, int iteration) {
+    long nanos = System.nanoTime();
+    KeyValueScanner [] ss = ms.getScanners();
+    KeyValueScanner s = ss[0];
+    s.seek(KeyValue.createFirstOnRow(new byte[]{}));
+
+    System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
+    int cnt=0;
+    while(s.next() != null) ++cnt;
+
+    System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt);
+
+  }
+
+  public static void main(String [] args) {
+    ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+    MemStore ms = new MemStore();
+
+    long n1 = System.nanoTime();
+    addRows(25000, ms);
+    System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
+
+
+    System.out.println("foo");
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+    for (int i = 0 ; i < 50 ; i++)
+      doScan(ms, i);
+
+  }
+
+  
 }

Added: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java?rev=944529&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java Sat May 15 00:18:37 2010
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestReadWriteConsistencyControl extends TestCase {
+  static class Writer implements Runnable {
+    final AtomicBoolean finished;
+    final ReadWriteConsistencyControl rwcc;
+    final AtomicBoolean status;
+
+    Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+      this.finished = finished;
+      this.rwcc = rwcc;
+      this.status = status;
+    }
+    private Random rnd = new Random();
+    public boolean failed = false;
+
+    public void run() {
+      while (!finished.get()) {
+        ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+//        System.out.println("Begin write: " + e.getWriteNumber());
+        // 10 usec - 500usec (including 0)
+        int sleepTime = rnd.nextInt(500);
+        // 500 * 1000 = 500,000ns = 500 usec
+        // 1 * 100 = 100ns = 1usec
+        try {
+          if (sleepTime > 0)
+            Thread.sleep(0, sleepTime * 1000);
+        } catch (InterruptedException e1) {
+        }
+        try {
+          rwcc.completeMemstoreInsert(e);
+        } catch (RuntimeException ex) {
+          // got failure
+          System.out.println(ex.toString());
+          ex.printStackTrace();
+          status.set(false);
+          return;
+          // Report failure if possible.
+        }
+      }
+    }
+  }
+
+  public void testParallelism() throws Exception {
+    final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+
+    final AtomicBoolean finished = new AtomicBoolean(false);
+
+    // fail flag for the reader thread
+    final AtomicBoolean readerFailed = new AtomicBoolean(false);
+    final AtomicLong failedAt = new AtomicLong();
+    Runnable reader = new Runnable() {
+      public void run() {
+        long prev = rwcc.memstoreReadPoint();
+        while (!finished.get()) {
+          long newPrev = rwcc.memstoreReadPoint();
+          if (newPrev < prev) {
+            // serious problem.
+            System.out.println("Reader got out of order, prev: " +
+            prev + " next was: " + newPrev);
+            readerFailed.set(true);
+            // might as well give up
+            failedAt.set(newPrev);
+            return;
+          }
+        }
+      }
+    };
+
+    // writer thread parallelism.
+    int n = 20;
+    Thread [] writers = new Thread[n];
+    AtomicBoolean [] statuses = new AtomicBoolean[n];
+    Thread readThread = new Thread(reader);
+
+    for (int i = 0 ; i < n ; ++i ) {
+      statuses[i] = new AtomicBoolean(true);
+      writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
+      writers[i].start();
+    }
+    readThread.start();
+
+    try {
+      Thread.sleep(10 * 1000);
+    } catch (InterruptedException ex) {
+    }
+
+    finished.set(true);
+
+    readThread.join();
+    for (int i = 0; i < n; ++i) {
+      writers[i].join();
+    }
+
+    // check failure.
+    assertFalse(readerFailed.get());
+    for (int i = 0; i < n; ++i) {
+      assertTrue(statuses[i].get());
+    }
+
+
+  }
+}

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat May 15 00:18:37 2010
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,24 +20,23 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
 import junit.framework.TestCase;
-
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
 public class TestStoreScanner extends TestCase {
   private static final String CF_STR = "cf";
   final byte [] CF = Bytes.toBytes(CF_STR);
 
-  /**
+  /*
    * Test utility for building a NavigableSet for scanners.
    * @param strCols
    * @return
@@ -128,7 +127,7 @@ public class TestStoreScanner extends Te
     assertEquals(kvs[0], results.get(0));
   }
 
-  /**
+  /*
    * Test test shows exactly how the matcher's return codes confuses the StoreScanner
    * and prevent it from doing the right thing.  Seeking once, then nexting twice
    * should return R1, then R2, but in this case it doesnt.
@@ -189,7 +188,7 @@ public class TestStoreScanner extends Te
     assertEquals(0, results.size());
   }
 
-  /**
+  /*
    * Test the case where there is a delete row 'in front of' the next row, the scanner
    * will move to the next row.
    */
@@ -407,9 +406,9 @@ public class TestStoreScanner extends Te
     results.clear();
     assertEquals(false, scan.next(results));
   }
-
-  /**
-   * Test expiration of KeyValues in combination with a configured TTL for
+  
+  /*
+   * Test expiration of KeyValues in combination with a configured TTL for 
    * a column family (as should be triggered in a major compaction).
    */
   public void testWildCardTtlScan() throws IOException {