You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/04/19 21:05:29 UTC
svn commit: r935708 [2/2] - in /hadoop/hbase/branches/0.20: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/
src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop...
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Mon Apr 19 19:05:29 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -46,10 +47,12 @@ public class TestMemStore extends TestCa
private static final byte [] FAMILY = Bytes.toBytes("column");
private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic");
private static final String CONTENTSTR = "contentstr";
+ private ReadWriteConsistencyControl rwcc;
@Override
public void setUp() throws Exception {
super.setUp();
+ this.rwcc = new ReadWriteConsistencyControl();
this.memstore = new MemStore();
}
@@ -75,6 +78,7 @@ public class TestMemStore extends TestCa
KeyValueScanner [] memstorescanners = this.memstore.getScanners();
Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>();
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
int count = 0;
@@ -93,6 +97,8 @@ public class TestMemStore extends TestCa
for (int i = 0; i < memstorescanners.length; i++) {
memstorescanners[0].close();
}
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -137,9 +143,9 @@ public class TestMemStore extends TestCa
if (count == snapshotIndex) {
this.memstore.snapshot();
this.memstore.clearSnapshot(this.memstore.getSnapshot());
- // Added more rows into kvset.
+ // Added more rows into kvset. But the scanner wont see these rows.
addRows(this.memstore, ts);
- LOG.info("Snapshotted, cleared it and then added values");
+ LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
}
result.clear();
}
@@ -149,6 +155,181 @@ public class TestMemStore extends TestCa
assertEquals(rowCount, count);
}
+ /**
+ * A simple test which verifies the 3 possible states when scanning across snapshot.
+ */
+ public void testScanAcrossSnapshot2() {
+ // we are going to the scanning across snapshot with two kvs
+ // kv1 should always be returned before kv2
+ final byte[] one = Bytes.toBytes(1);
+ final byte[] two = Bytes.toBytes(2);
+ final byte[] f = Bytes.toBytes("f");
+ final byte[] q = Bytes.toBytes("q");
+ final byte[] v = Bytes.toBytes(3);
+
+ final KeyValue kv1 = new KeyValue(one, f, q, v);
+ final KeyValue kv2 = new KeyValue(two, f, q, v);
+
+ // use case 1: both kvs in kvset
+ this.memstore.add(kv1.clone());
+ this.memstore.add(kv2.clone());
+ verifyScanAcrossSnapshot2(kv1, kv2);
+
+ // use case 2: both kvs in snapshot
+ this.memstore.snapshot();
+ verifyScanAcrossSnapshot2(kv1, kv2);
+
+ // use case 3: first in snapshot second in kvset
+ this.memstore = new MemStore();
+ this.memstore.add(kv1.clone());
+ this.memstore.snapshot();
+ this.memstore.add(kv2.clone());
+ verifyScanAcrossSnapshot2(kv1, kv2);
+ }
+
+ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ KeyValueScanner[] memstorescanners = this.memstore.getScanners();
+ assertEquals(1, memstorescanners.length);
+ final KeyValueScanner scanner = memstorescanners[0];
+ scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ assertEquals(kv1, scanner.next());
+ assertEquals(kv2, scanner.next());
+ assertNull(scanner.next());
+ }
+
+ private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
+ scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
+ for (KeyValue kv : expected) {
+ assertTrue(0 ==
+ KeyValue.COMPARATOR.compare(kv,
+ scanner.next()));
+ }
+ assertNull(scanner.peek());
+ }
+
+ public void testMemstoreConcurrentControl() {
+ final byte[] row = Bytes.toBytes(1);
+ final byte[] f = Bytes.toBytes("family");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] v = Bytes.toBytes("value");
+
+ ReadWriteConsistencyControl.WriteEntry w =
+ rwcc.beginMemstoreInsert();
+
+ KeyValue kv1 = new KeyValue(row, f, q1, v);
+ kv1.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv1);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ KeyValueScanner[] s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{});
+
+ rwcc.completeMemstoreInsert(w);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1});
+
+ w = rwcc.beginMemstoreInsert();
+ KeyValue kv2 = new KeyValue(row, f, q2, v);
+ kv2.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv2);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1});
+
+ rwcc.completeMemstoreInsert(w);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1, kv2});
+ }
+
+ private static class ReadOwnWritesTester extends Thread {
+ final int id;
+ static final int NUM_TRIES = 1000;
+
+ final byte[] row;
+
+ final byte[] f = Bytes.toBytes("family");
+ final byte[] q1 = Bytes.toBytes("q1");
+
+ final ReadWriteConsistencyControl rwcc;
+ final MemStore memstore;
+
+ AtomicReference<Throwable> caughtException;
+
+
+ public ReadOwnWritesTester(int id,
+ MemStore memstore,
+ ReadWriteConsistencyControl rwcc,
+ AtomicReference<Throwable> caughtException)
+ {
+ this.id = id;
+ this.rwcc = rwcc;
+ this.memstore = memstore;
+ this.caughtException = caughtException;
+ row = Bytes.toBytes(id);
+ }
+
+ public void run() {
+ try {
+ internalRun();
+ } catch (Throwable t) {
+ caughtException.compareAndSet(null, t);
+ }
+ }
+
+ private void internalRun() {
+ for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
+ ReadWriteConsistencyControl.WriteEntry w =
+ rwcc.beginMemstoreInsert();
+
+ // Insert the sequence value (i)
+ byte[] v = Bytes.toBytes(i);
+
+ KeyValue kv = new KeyValue(row, f, q1, i, v);
+ kv.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv);
+ rwcc.completeMemstoreInsert(w);
+
+ // Assert that we can read back
+
+ KeyValueScanner s = this.memstore.getScanners()[0];
+ s.seek(kv);
+
+ KeyValue ret = s.next();
+ assertNotNull("Didnt find own write at all", ret);
+ assertEquals("Didnt read own writes",
+ kv.getTimestamp(), ret.getTimestamp());
+ }
+ }
+ }
+
+ public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
+
+ int NUM_THREADS = 8;
+
+ ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS];
+ AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught);
+ threads[i].start();
+ }
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join();
+ }
+
+ if (caught.get() != null) {
+ throw caught.get();
+ }
+ }
+
/**
* Test memstore snapshots
* @throws IOException
@@ -442,9 +623,10 @@ public class TestMemStore extends TestCa
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(put3);
expected.add(del2);
+ expected.add(put2);
expected.add(put1);
-
- assertEquals(3, memstore.kvset.size());
+
+ assertEquals(4, memstore.kvset.size());
int i = 0;
for(KeyValue kv : memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -476,8 +658,11 @@ public class TestMemStore extends TestCa
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(put3);
expected.add(del2);
+ expected.add(put2);
+ expected.add(put1);
+
- assertEquals(2, memstore.kvset.size());
+ assertEquals(4, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -510,9 +695,14 @@ public class TestMemStore extends TestCa
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(del);
+ expected.add(put1);
+ expected.add(put2);
expected.add(put4);
+ expected.add(put3);
+
+
- assertEquals(2, memstore.kvset.size());
+ assertEquals(5, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -528,7 +718,7 @@ public class TestMemStore extends TestCa
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@@ -541,7 +731,7 @@ public class TestMemStore extends TestCa
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteColumn() throws IOException {
@@ -553,7 +743,7 @@ public class TestMemStore extends TestCa
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteFamily() throws IOException {
@@ -565,7 +755,7 @@ public class TestMemStore extends TestCa
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@@ -573,13 +763,13 @@ public class TestMemStore extends TestCa
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
- private byte [] makeQualifier(final int i1, final int i2){
+ private static byte [] makeQualifier(final int i1, final int i2){
return Bytes.toBytes(Integer.toString(i1) + ";" +
Integer.toString(i2));
}
/**
- * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+ * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@@ -589,7 +779,7 @@ public class TestMemStore extends TestCa
}
/**
- * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+ * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@@ -643,4 +833,57 @@ public class TestMemStore extends TestCa
return new KeyValue(row, Bytes.toBytes("test_col:"),
HConstants.LATEST_TIMESTAMP, value);
}
+ private static void addRows(int count, final MemStore mem) {
+ long nanos = System.nanoTime();
+
+ for (int i = 0 ; i < count ; i++) {
+ if (i % 1000 == 0) {
+
+ System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000);
+ nanos = System.nanoTime();
+ }
+ long timestamp = System.currentTimeMillis();
+
+ for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
+ byte [] row = Bytes.toBytes(i);
+ byte [] qf = makeQualifier(i, ii);
+ mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+ }
+ }
+ }
+
+
+ static void doScan(MemStore ms, int iteration) {
+ long nanos = System.nanoTime();
+ KeyValueScanner [] ss = ms.getScanners();
+ KeyValueScanner s = ss[0];
+ s.seek(KeyValue.createFirstOnRow(new byte[]{}));
+
+ System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
+ int cnt=0;
+ while(s.next() != null) ++cnt;
+
+ System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt);
+
+ }
+
+ public static void main(String [] args) {
+ ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+ MemStore ms = new MemStore();
+
+ long n1 = System.nanoTime();
+ addRows(25000, ms);
+ System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
+
+
+ System.out.println("foo");
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ for (int i = 0 ; i < 50 ; i++)
+ doScan(ms, i);
+
+ }
+
+
}
Added: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java?rev=935708&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java Mon Apr 19 19:05:29 2010
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestReadWriteConsistencyControl extends TestCase {
+ static class Writer implements Runnable {
+ final AtomicBoolean finished;
+ final ReadWriteConsistencyControl rwcc;
+ final AtomicBoolean status;
+
+ Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+ this.finished = finished;
+ this.rwcc = rwcc;
+ this.status = status;
+ }
+ private Random rnd = new Random();
+ public boolean failed = false;
+
+ public void run() {
+ while (!finished.get()) {
+ ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+// System.out.println("Begin write: " + e.getWriteNumber());
+ // 10 usec - 500usec (including 0)
+ int sleepTime = rnd.nextInt(500);
+ // 500 * 1000 = 500,000ns = 500 usec
+ // 1 * 100 = 100ns = 1usec
+ try {
+ if (sleepTime > 0)
+ Thread.sleep(0, sleepTime * 1000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ rwcc.completeMemstoreInsert(e);
+ } catch (RuntimeException ex) {
+ // got failure
+ System.out.println(ex.toString());
+ ex.printStackTrace();
+ status.set(false);
+ return;
+ // Report failure if possible.
+ }
+ }
+ }
+ }
+
+ public void testParallelism() throws Exception {
+ final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+
+ // fail flag for the reader thread
+ final AtomicBoolean readerFailed = new AtomicBoolean(false);
+ final AtomicLong failedAt = new AtomicLong();
+ Runnable reader = new Runnable() {
+ public void run() {
+ long prev = rwcc.memstoreReadPoint();
+ while (!finished.get()) {
+ long newPrev = rwcc.memstoreReadPoint();
+ if (newPrev < prev) {
+ // serious problem.
+ System.out.println("Reader got out of order, prev: " +
+ prev + " next was: " + newPrev);
+ readerFailed.set(true);
+ // might as well give up
+ failedAt.set(newPrev);
+ return;
+ }
+ }
+ }
+ };
+
+ // writer thread parallelism.
+ int n = 20;
+ Thread [] writers = new Thread[n];
+ AtomicBoolean [] statuses = new AtomicBoolean[n];
+ Thread readThread = new Thread(reader);
+
+ for (int i = 0 ; i < n ; ++i ) {
+ statuses[i] = new AtomicBoolean(true);
+ writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
+ writers[i].start();
+ }
+ readThread.start();
+
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ }
+
+ finished.set(true);
+
+ readThread.join();
+ for (int i = 0; i < n; ++i) {
+ writers[i].join();
+ }
+
+ // check failure.
+ assertFalse(readerFailed.get());
+ for (int i = 0; i < n; ++i) {
+ assertTrue(statuses[i].get());
+ }
+
+
+ }
+}
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Mon Apr 19 19:05:29 2010
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,24 +20,23 @@
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
import junit.framework.TestCase;
-
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
public class TestStoreScanner extends TestCase {
private final String CF_STR = "cf";
final byte [] CF = Bytes.toBytes(CF_STR);
- /**
+ /*
* Test utility for building a NavigableSet for scanners.
* @param strCols
* @return
@@ -128,7 +127,7 @@ public class TestStoreScanner extends Te
assertEquals(kvs[0], results.get(0));
}
- /**
+ /*
* Test test shows exactly how the matcher's return codes confuses the StoreScanner
* and prevent it from doing the right thing. Seeking once, then nexting twice
* should return R1, then R2, but in this case it doesnt.
@@ -189,7 +188,7 @@ public class TestStoreScanner extends Te
assertEquals(0, results.size());
}
- /**
+ /*
* Test the case where there is a delete row 'in front of' the next row, the scanner
* will move to the next row.
*/
@@ -408,7 +407,7 @@ public class TestStoreScanner extends Te
assertEquals(false, scan.next(results));
}
- /**
+ /*
* Test expiration of KeyValues in combination with a configured TTL for
* a column family (as should be triggered in a major compaction).
*/