You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:31 UTC
[24/50] [abbrv] incubator-omid git commit: analyze the transactional
anomalies described by P. Baillis et al.
analyze the transactional anomalies described by P. Baillis et al.
Change-Id: I584f3b5f0cab9a9867f8c47a52acc2fc6df07645
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/cad28cc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/cad28cc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/cad28cc9
Branch: refs/heads/master
Commit: cad28cc9d4a1f9aaf03ec7584764dd385d02494a
Parents: bcdd3d1
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Wed Apr 27 18:15:08 2016 -0700
Committer: Igor Katkov <ka...@yahoo-inc.com>
Committed: Thu Apr 28 17:07:23 2016 -0700
----------------------------------------------------------------------
.../apache/omid/benchmarks/tso/RawTxRunner.java | 2 +-
.../TestBaillisAnomaliesWithTXs.java | 601 +++++++++++++++++++
2 files changed, 602 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cad28cc9/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
----------------------------------------------------------------------
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index ef7bb06..f18fb56 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -163,7 +163,7 @@ class RawTxRunner implements Runnable {
}
- public void stop() {
+ void stop() {
isRunning = false;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/cad28cc9/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
new file mode 100644
index 0000000..5bfc8d5
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.testng.ITestContext;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+/**
+ * These tests try to analyze the transactional anomalies described by P. Baillis et al. in
+ * http://arxiv.org/pdf/1302.0309.pdf
+ *
+ * These tests try to model what project Hermitage is trying to do to compare the behavior of different DBMSs on these
+ * anomalies depending on the different isolation levels they offer. For more info on the Hermitage project, please
+ * refer to: https://github.com/ept/hermitage
+ *
+ * Transactional histories have been translated to HBase from the ones done for Postgresql in the Hermitage project:
+ * https://github.com/ept/hermitage/blob/master/postgres.md
+ *
+ * The "repeatable read" Postgresql isolation level is equivalent to "snapshot isolation", so we include the experiments
+ * for that isolation level
+ *
+ * With HBase 0.98 interfaces is not possible to execute updates/deletes based on predicates so the examples here are
+ * not exactly the same as in Postgres
+ */
+@Test(groups = "sharedHBase")
+public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
+
+ private static final Logger LOG = getLogger(TestBaillisAnomaliesWithTXs.class);
+ private static final String TEST_COLUMN = "baillis-col";
+
+
+ // Data used in the tests
+ private byte[] famName = Bytes.toBytes(TEST_FAMILY);
+ private byte[] colName = Bytes.toBytes(TEST_COLUMN);
+
+ private byte[] rowId1 = Bytes.toBytes("row1");
+ private byte[] rowId2 = Bytes.toBytes("row2");
+ private byte[] rowId3 = Bytes.toBytes("row3");
+
+ private byte[] dataValue1 = Bytes.toBytes(10);
+ private byte[] dataValue2 = Bytes.toBytes(20);
+ private byte[] dataValue3 = Bytes.toBytes(30);
+
+
+ @Test
+ public void testSIPreventsPredicateManyPrecedersForReadPredicates(ITestContext context) throws Exception {
+ // TX History for PMP for Read Predicate:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where value = 30; -- T1. Returns nothing
+ // insert into test (id, value) values(3, 30); -- T2
+ // commit; -- T2
+ // select * from test where value % 3 = 0; -- T1. Still returns nothing
+ // commit; -- T1
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ // 1) select * from test where value = 30; -- T1. Returns nothing
+ Scan scan = new Scan();
+ Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(30));
+ scan.setFilter(f);
+ ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
+ assertNull(tx1Scanner.next());
+
+ // 2) insert into test (id, value) values(3, 30); -- T2
+ Put newRow = new Put(rowId3);
+ newRow.add(famName, colName, dataValue3);
+ txTable.put(tx2, newRow);
+
+ // 3) Commit TX 2
+ tm.commit(tx2);
+
+ // 4) select * from test where value % 3 = 0; -- T1. Still returns nothing
+ tx1Scanner = txTable.getScanner(tx1, scan);
+ assertNull(tx1Scanner.next());
+
+ // 5) Commit TX 1
+ tm.commit(tx1);
+ }
+
+ @Test
+ public void testSIPreventsPredicateManyPrecedersForWritePredicates(ITestContext context) throws Exception {
+ // TX History for PMP for Write Predicate:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // update test set value = value + 10; -- T1
+ // delete from test where value = 20; -- T2, BLOCKS
+ // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
+ // abort; -- T2. There's nothing else we can do, this transaction has failed
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ // 1) update test set value = value + 10; -- T1
+ Scan updateScan = new Scan();
+ ResultScanner tx1Scanner = txTable.getScanner(tx2, updateScan);
+ Result updateRes = tx1Scanner.next();
+ int count = 0;
+ while (updateRes != null) {
+ LOG.info("RESSS {}", updateRes);
+ Put row = new Put(updateRes.getRow());
+ int val = Bytes.toInt(updateRes.getValue(famName, colName));
+ LOG.info("Updating row id {} with value {}", Bytes.toString(updateRes.getRow()), val);
+ row.add(famName, colName, Bytes.toBytes(val + 10));
+ txTable.put(tx1, row);
+ updateRes = tx1Scanner.next();
+ count++;
+ }
+ assertEquals(count, 2);
+
+ // 2) delete from test where value = 20; -- T2, BLOCKS
+ Scan scan = new Scan();
+ Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
+ scan.setFilter(f);
+ ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+ // assertEquals(tx2Scanner.next(100).length, 1);
+ Result res = tx2Scanner.next();
+ int count20 = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toInt(res.getValue(famName, colName)));
+ Delete delete20 = new Delete(res.getRow());
+ txTable.delete(tx2, delete20);
+ res = tx2Scanner.next();
+ count20++;
+ }
+ assertEquals(count20, 1);
+ // 3) commit TX 1
+ tm.commit(tx1);
+
+ tx2Scanner = txTable.getScanner(tx2, scan);
+ assertNull(tx2Scanner.next());
+
+ // 4) commit TX 2 -> Should be rolled-back
+ try {
+ tm.commit(tx2);
+ fail();
+ } catch (RollbackException e) {
+ // Expected
+ }
+
+ }
+
+ @Test
+ public void testSIPreventsLostUpdates(ITestContext context) throws Exception {
+ // TX History for P4:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where id = 1; -- T1
+ // select * from test where id = 1; -- T2
+ // update test set value = 11 where id = 1; -- T1
+ // update test set value = 11 where id = 1; -- T2, BLOCKS
+ // commit; -- T1. T2 now prints out "ERROR: could not serialize access due to concurrent update"
+ // abort; -- T2. There's nothing else we can do, this transaction has failed
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ Scan scan = new Scan(rowId1, rowId1);
+ scan.addColumn(famName, colName);
+
+ // 1) select * from test where id = 1; -- T1
+ ResultScanner tx1Scanner = txTable.getScanner(tx1, scan);
+ Result res = tx1Scanner.next();
+ int count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ res = tx1Scanner.next();
+ count++;
+ }
+ assertEquals(count, 1);
+
+ // 2) select * from test where id = 1; -- T2
+ ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+ res = tx2Scanner.next();
+ count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ res = tx2Scanner.next();
+ count++;
+ }
+ assertEquals(count, 1);
+
+ // 3) update test set value = 11 where id = 1; -- T1
+ Put updateRow1Tx1 = new Put(rowId1);
+ updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+ txTable.put(tx1, updateRow1Tx1);
+
+ // 4) update test set value = 11 where id = 1; -- T2
+ Put updateRow1Tx2 = new Put(rowId1);
+ updateRow1Tx2.add(famName, colName, Bytes.toBytes("11"));
+ txTable.put(tx2, updateRow1Tx2);
+
+ // 5) commit -- T1
+ tm.commit(tx1);
+
+ // 6) commit -- T2 --> should be rolled-back
+ try {
+ tm.commit(tx2);
+ fail();
+ } catch (RollbackException e) {
+ // Expected
+ }
+
+ }
+
+ @Test
+ public void testSIPreventsReadSkew(ITestContext context) throws Exception {
+ // TX History for G-single:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where id = 1; -- T1. Shows 1 => 10
+ // select * from test where id = 1; -- T2
+ // select * from test where id = 2; -- T2
+ // update test set value = 12 where id = 1; -- T2
+ // update test set value = 18 where id = 2; -- T2
+ // commit; -- T2
+ // select * from test where id = 2; -- T1. Shows 2 => 20
+ // commit; -- T1
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ Scan rowId1Scan = new Scan(rowId1, rowId1);
+ rowId1Scan.addColumn(famName, colName);
+
+ // 1) select * from test where id = 1; -- T1. Shows 1 => 10
+ ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId1Scan);
+ Result res = tx1Scanner.next();
+ int count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ res = tx1Scanner.next();
+ count++;
+ }
+ assertEquals(count, 1);
+
+ // 2) select * from test where id = 1; -- T2
+ ResultScanner tx2Scanner = txTable.getScanner(tx2, rowId1Scan);
+ res = tx2Scanner.next();
+ count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ res = tx2Scanner.next();
+ count++;
+ }
+
+ Scan rowId2Scan = new Scan(rowId2, rowId2);
+ rowId2Scan.addColumn(famName, colName);
+
+ // 3) select * from test where id = 2; -- T2
+ tx2Scanner = txTable.getScanner(tx2, rowId2Scan);
+ res = tx2Scanner.next();
+ count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId2);
+ assertEquals(res.getValue(famName, colName), dataValue2);
+ res = tx2Scanner.next();
+ count++;
+ }
+
+ // 4) update test set value = 12 where id = 1; -- T2
+ Put updateRow1Tx2 = new Put(rowId1);
+ updateRow1Tx2.add(famName, colName, Bytes.toBytes("12"));
+ txTable.put(tx1, updateRow1Tx2);
+
+ // 5) update test set value = 18 where id = 1; -- T2
+ Put updateRow2Tx2 = new Put(rowId2);
+ updateRow2Tx2.add(famName, colName, Bytes.toBytes("18"));
+ txTable.put(tx2, updateRow2Tx2);
+
+ // 6) commit -- T2
+ tm.commit(tx2);
+
+ // 7) select * from test where id = 2; -- T1. Shows 2 => 20
+ tx1Scanner = txTable.getScanner(tx1, rowId2Scan);
+ res = tx1Scanner.next();
+ count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()),
+ Bytes.toString(res.getValue(famName, colName)));
+ assertEquals(res.getRow(), rowId2);
+ assertEquals(res.getValue(famName, colName), dataValue2);
+ res = tx1Scanner.next();
+ count++;
+ }
+
+ // 8) commit -- T1
+ tm.commit(tx1);
+
+ }
+
+ @Test
+ public void testSIPreventsReadSkewUsingWritePredicate(ITestContext context) throws Exception {
+ // TX History for G-single:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where id = 1; -- T1. Shows 1 => 10
+ // select * from test; -- T2
+ // update test set value = 12 where id = 1; -- T2
+ // update test set value = 18 where id = 2; -- T2
+ // commit; -- T2
+ // delete from test where value = 20; -- T1. Prints "ERROR: could not serialize access due to concurrent update"
+ // abort; -- T1. There's nothing else we can do, this transaction has failed
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ // 1) select * from test; -- T1
+ assertNumberOfRows(txTable, tx1, 2, new Scan());
+
+ // 2) select * from test; -- T2
+ assertNumberOfRows(txTable, tx2, 2, new Scan());
+
+ // 3) update test set value = 12 where id = 1; -- T2
+ // 4) update test set value = 18 where id = 2; -- T2
+ Put updateRow1Tx2 = new Put(rowId1);
+ updateRow1Tx2.add(famName, colName, Bytes.toBytes(12));
+ Put updateRow2Tx2 = new Put(rowId2);
+ updateRow2Tx2.add(famName, colName, Bytes.toBytes(18));
+ txTable.put(tx2, Arrays.asList(updateRow1Tx2, updateRow2Tx2));
+
+ // 5) commit; -- T2
+ tm.commit(tx2);
+
+ // 6) delete from test where value = 20; -- T1. Prints
+ // "ERROR: could not serialize access due to concurrent update"
+ Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(20));
+ Scan checkFor20 = new Scan();
+ checkFor20.setFilter(f);
+ ResultScanner checkFor20Scanner = txTable.getScanner(tx1, checkFor20);
+ Result res = checkFor20Scanner.next();
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Deleting row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
+ Delete delete20 = new Delete(res.getRow());
+ txTable.delete(tx1, delete20);
+ res = checkFor20Scanner.next();
+ }
+
+ // 7) abort; -- T1
+ try {
+ tm.commit(tx1);
+ fail("Should be aborted");
+ } catch (RollbackException e) {
+ // Expected
+ }
+
+ }
+
+ // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
+ @Test
+ public void testSIDoesNotPreventWriteSkew(ITestContext context) throws Exception {
+ // TX History for G2-item:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where id in (1,2); -- T1
+ // select * from test where id in (1,2); -- T2
+ // update test set value = 11 where id = 1; -- T1
+ // update test set value = 21 where id = 2; -- T2
+ // commit; -- T1
+ // commit; -- T2
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ Scan rowId12Scan = new Scan(rowId1, rowId3);
+ rowId12Scan.addColumn(famName, colName);
+
+ // 1) select * from test where id in (1,2); -- T1
+ ResultScanner tx1Scanner = txTable.getScanner(tx1, rowId12Scan);
+ Result res = tx1Scanner.next();
+ int count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
+ switch (count) {
+ case 0:
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ break;
+ case 1:
+ assertEquals(res.getRow(), rowId2);
+ assertEquals(res.getValue(famName, colName), dataValue2);
+ break;
+ default:
+ fail();
+ }
+ res = tx1Scanner.next();
+ count++;
+ }
+ assertEquals(count, 2);
+
+ // 2) select * from test where id in (1,2); -- T2
+ ResultScanner tx2Scanner = txTable.getScanner(tx1, rowId12Scan);
+ res = tx2Scanner.next();
+ count = 0;
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
+ switch (count) {
+ case 0:
+ assertEquals(res.getRow(), rowId1);
+ assertEquals(res.getValue(famName, colName), dataValue1);
+ break;
+ case 1:
+ assertEquals(res.getRow(), rowId2);
+ assertEquals(res.getValue(famName, colName), dataValue2);
+ break;
+ default:
+ fail();
+ }
+ res = tx2Scanner.next();
+ count++;
+ }
+ assertEquals(count, 2);
+
+ // 3) update test set value = 11 where id = 1; -- T1
+ Put updateRow1Tx1 = new Put(rowId1);
+ updateRow1Tx1.add(famName, colName, Bytes.toBytes("11"));
+ txTable.put(tx1, updateRow1Tx1);
+
+ // 4) update test set value = 21 where id = 2; -- T2
+ Put updateRow2Tx2 = new Put(rowId2);
+ updateRow2Tx2.add(famName, colName, Bytes.toBytes("21"));
+ txTable.put(tx2, updateRow2Tx2);
+
+ // 5) commit; -- T1
+ tm.commit(tx1);
+
+ // 6) commit; -- T2
+ tm.commit(tx2);
+ }
+
+ // this test shows that Omid does not provide serilizable level of isolation other wise last commit would have failed
+ @Test
+ public void testSIDoesNotPreventAntiDependencyCycles(ITestContext context) throws Exception {
+ // TX History for G2:
+ // begin; set transaction isolation level repeatable read; -- T1
+ // begin; set transaction isolation level repeatable read; -- T2
+ // select * from test where value % 3 = 0; -- T1
+ // select * from test where value % 3 = 0; -- T2
+ // insert into test (id, value) values(3, 30); -- T1
+ // insert into test (id, value) values(4, 42); -- T2
+ // commit; -- T1
+ // commit; -- T2
+ // select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
+
+ // 0) Start transactions
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ Transaction tx1 = tm.begin();
+ Transaction tx2 = tm.begin();
+
+ Filter f = new SingleColumnValueFilter(famName, colName, CompareFilter.CompareOp.EQUAL, Bytes.toBytes("30"));
+ Scan value30 = new Scan();
+ value30.setFilter(f);
+ value30.addColumn(famName, colName);
+
+ // 1) select * from test where value % 3 = 0; -- T1
+ assertNumberOfRows(txTable, tx1, 0, value30);
+
+
+ // 2) select * from test where value % 3 = 0; -- T2
+ assertNumberOfRows(txTable, tx2, 0, value30);
+
+
+ // 3) insert into test (id, value) values(3, 30); -- T1
+ Put insertRow3Tx1 = new Put(rowId1);
+ insertRow3Tx1.add(famName, colName, Bytes.toBytes("30"));
+ txTable.put(tx1, insertRow3Tx1);
+
+ // 4) insert into test (id, value) values(4, 42); -- T2
+ Put updateRow4Tx2 = new Put(rowId2);
+ updateRow4Tx2.add(famName, colName, Bytes.toBytes("42"));
+ txTable.put(tx2, updateRow4Tx2);
+
+ // 5) commit; -- T1
+ tm.commit(tx1);
+
+ // 6) commit; -- T2
+ tm.commit(tx2);
+
+ // 7) select * from test where value % 3 = 0; -- Either. Returns 3 => 30, 4 => 42
+ }
+
+ /**
+ * This translates the table initialization done in:
+ * https://github.com/ept/hermitage/blob/master/postgres.md
+ *
+ * create table test (id int primary key, value int);
+ * insert into test (id, value) values (1, 10), (2, 20);
+ */
+ @BeforeMethod(alwaysRun = true)
+ private void loadBaseDataOnTestTable(ITestContext context) throws Exception {
+
+ TransactionManager tm = newTransactionManager(context);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction initializationTx = tm.begin();
+ Put row1 = new Put(rowId1);
+ row1.add(famName, colName, dataValue1);
+ txTable.put(initializationTx, row1);
+ Put row2 = new Put(rowId2);
+ row2.add(famName, colName, dataValue2);
+ txTable.put(initializationTx, row2);
+
+ tm.commit(initializationTx);
+ }
+
+
+ private void assertNumberOfRows(TTable txTable, Transaction tx2, int maxCount, Scan scan) throws IOException {
+ int count = 0;
+ ResultScanner tx2Scanner = txTable.getScanner(tx2, scan);
+ Result res = tx2Scanner.next();
+ while (res != null) {
+ LOG.info("RESSS {}", res);
+ LOG.info("Row id {} with value {}", Bytes.toString(res.getRow()), Bytes.toInt(res.getValue(famName, colName)));
+ res = tx2Scanner.next();
+ count++;
+ }
+ assertEquals(count, maxCount);
+ }
+
+
+}