You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gc...@apache.org on 2012/12/18 22:27:53 UTC
svn commit: r1423671 -
/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
Author: gchanan
Date: Tue Dec 18 21:27:53 2012
New Revision: 1423671
URL: http://svn.apache.org/viewvc?rev=1423671&view=rev
Log:
HBASE-7377 Clean up TestHBase7051
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java?rev=1423671&r1=1423670&r2=1423671&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHBase7051.java Tue Dec 18 21:27:53 2012
@@ -16,6 +16,9 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -30,19 +33,32 @@ import org.junit.experimental.categories
import com.google.common.collect.Lists;
+/**
+ * Test of HBASE-7051; that checkAndPuts and puts behave atomically with respect to each other.
+ * Rather than perform a bunch of trials to verify atomicity, this test recreates a race condition
+ * that causes the test to fail if checkAndPut doesn't wait for outstanding put transactions
+ * to complete. It does this by invasively overriding HRegion function to affect the timing of
+ * the operations.
+ */
@Category(SmallTests.class)
public class TestHBase7051 {
- private static volatile boolean putCompleted = false;
private static CountDownLatch latch = new CountDownLatch(1);
- private boolean checkAndPutCompleted = false;
- private static int count = 0;
-
+ private enum TestStep {
+ INIT, // initial put of 10 to set value of the cell
+ PUT_STARTED, // began doing a put of 50 to cell
+ PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
+ CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
+ CHECKANDPUT_COMPLETED // completed checkAndPut
+ // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
+ }
+ private static volatile TestStep testStep = TestStep.INIT;
+ private final String family = "f1";
+
@Test
public void testPutAndCheckAndPutInParallel() throws Exception {
final String tableName = "testPutAndCheckAndPut";
- final String family = "f1";
Configuration conf = HBaseConfiguration.create();
conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(Bytes.toBytes(tableName),
@@ -57,14 +73,16 @@ public class TestHBase7051 {
putsAndLocks.add(pair);
- count++;
region.batchMutate(putsAndLocks.toArray(new Pair[0]));
- makeCheckAndPut(family, region);
-
- makePut(family, region);
- while (!checkAndPutCompleted) {
+ MultithreadedTestUtil.TestContext ctx =
+ new MultithreadedTestUtil.TestContext(conf);
+ ctx.addThread(new PutThread(ctx, region));
+ ctx.addThread(new CheckAndPutThread(ctx, region));
+ ctx.startThreads();
+ while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
Thread.sleep(100);
}
+ ctx.stop();
Scan s = new Scan();
RegionScanner scanner = region.getScanner(s);
List<KeyValue> results = new ArrayList<KeyValue>();
@@ -75,54 +93,46 @@ public class TestHBase7051 {
}
- private void makePut(final String family, final MockHRegion region) {
- new Thread() {
- public void run() {
- List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
- Put[] puts = new Put[1];
- Put put = new Put(Bytes.toBytes("r1"));
- put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
- puts[0] = put;
- try {
- Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
- putsAndLocks.add(pair);
- count++;
- region.batchMutate(putsAndLocks.toArray(new Pair[0]));
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }.start();
- }
+ private class PutThread extends TestThread {
+ private MockHRegion region;
+ PutThread(TestContext ctx, MockHRegion region) {
+ super(ctx);
+ this.region = region;
+ }
- private void makeCheckAndPut(final String family, final MockHRegion region) {
- new Thread() {
+ public void doWork() throws Exception {
+ List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
+ Put[] puts = new Put[1];
+ Put put = new Put(Bytes.toBytes("r1"));
+ put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
+ puts[0] = put;
+ Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
+ putsAndLocks.add(pair);
+ testStep = TestStep.PUT_STARTED;
+ region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+ }
+ }
- public void run() {
- Put[] puts = new Put[1];
- Put put = new Put(Bytes.toBytes("r1"));
- put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
- puts[0] = put;
- try {
- while (putCompleted == false) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- count++;
- region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
- CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, null, true);
- checkAndPutCompleted = true;
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }.start();
+ private class CheckAndPutThread extends TestThread {
+ private MockHRegion region;
+ CheckAndPutThread(TestContext ctx, MockHRegion region) {
+ super(ctx);
+ this.region = region;
+ }
+
+ public void doWork() throws Exception {
+ Put[] puts = new Put[1];
+ Put put = new Put(Bytes.toBytes("r1"));
+ put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
+ puts[0] = put;
+ while (testStep != TestStep.PUT_COMPLETED) {
+ Thread.sleep(100);
+ }
+ testStep = TestStep.CHECKANDPUT_STARTED;
+ region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
+ CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, null, true);
+ testStep = TestStep.CHECKANDPUT_COMPLETED;
+ }
}
public static class MockHRegion extends HRegion {
@@ -134,36 +144,39 @@ public class TestHBase7051 {
@Override
public void releaseRowLock(Integer lockId) {
- if (count == 1) {
+ if (testStep == TestStep.INIT) {
super.releaseRowLock(lockId);
return;
}
- if (count == 2) {
+ if (testStep == TestStep.PUT_STARTED) {
try {
- putCompleted = true;
+ testStep = TestStep.PUT_COMPLETED;
super.releaseRowLock(lockId);
+ // put has been written to the memstore and the row lock has been released, but the
+ // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
+ // operations would cause the non-atomicity to show up:
+ // 1) Put releases row lock (where we are now)
+ // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
+ // because the MVCC has not advanced
+ // 3) Put advances MVCC
+ // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
+ // (see below), and then wait some more to give the checkAndPut time to read the old
+ // value.
latch.await();
+ Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
- if (count == 3) {
+ else if (testStep == TestStep.CHECKANDPUT_STARTED) {
super.releaseRowLock(lockId);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- latch.countDown();
}
}
@Override
public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
- if (count == 3) {
+ if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
return super.getLock(lockid, row, waitForLock);