You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:52 UTC
[35/52] [abbrv] incubator-omid git commit: Move com.yahoo ->
org.apache
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
new file mode 100644
index 0000000..2c7f9f9
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Charsets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.omid.TestUtils;
+import org.apache.omid.tso.LeaseManagement;
+import org.apache.omid.tso.PausableLeaseManager;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME;
+import static org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig.DEFAULT_TIMESTAMP_STORAGE_CF_NAME;
+import static org.apache.omid.tso.client.OmidClientConfiguration.ConnType.HA;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@Test(groups = "sharedHBase")
+public class TestEndToEndScenariosWithHA extends OmidTestBase {
+
+ private static final int TEST_LEASE_PERIOD_MS = 5_000;
+ private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
+ private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
+ private static final String NAMESPACE = "omid";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
+
+ private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
+ private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
+ private static final byte[] row1 = Bytes.toBytes("row1");
+ private static final byte[] row2 = Bytes.toBytes("row2");
+ private static final byte[] initialData = Bytes.toBytes("testWrite-0");
+ private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
+ private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
+ private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
+ private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
+ private static final int TSO1_PORT = 2223;
+ private static final int TSO2_PORT = 4321;
+
+ private CountDownLatch barrierTillTSOAddressPublication;
+
+ private CuratorFramework zkClient;
+
+ private TSOServer tso1;
+ private TSOServer tso2;
+
+ private PausableLeaseManager leaseManager1;
+
+ private TransactionManager tm;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void setup() throws Exception {
+ // Get the zkConnection string from minicluster
+ String zkConnection = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
+
+ zkClient = provideInitializedZookeeperClient(zkConnection);
+
+ // Synchronize TSO start
+ barrierTillTSOAddressPublication = new CountDownLatch(1);
+ final NodeCache currentTSOZNode = new NodeCache(zkClient, CURRENT_TSO_PATH);
+ currentTSOZNode.getListenable().addListener(new NodeCacheListener() {
+
+ @Override
+ public void nodeChanged() throws Exception {
+ byte[] currentTSOAndEpochAsBytes = currentTSOZNode.getCurrentData().getData();
+ String currentTSOAndEpoch = new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
+ if (currentTSOAndEpoch.endsWith("#0")) { // Wait till a TSO instance publishes the epoch
+ barrierTillTSOAddressPublication.countDown();
+ }
+ }
+
+ });
+ currentTSOZNode.start(true);
+
+ // Configure TSO 1
+ TSOServerConfig config1 = new TSOServerConfig();
+ config1.setPort(TSO1_PORT);
+ config1.setMaxItems(1000);
+ config1.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
+ Injector injector1 = Guice.createInjector(new TestTSOModule(hbaseConf, config1));
+ LOG.info("===================== Starting TSO 1 =====================");
+ tso1 = injector1.getInstance(TSOServer.class);
+ leaseManager1 = (PausableLeaseManager) injector1.getInstance(LeaseManagement.class);
+ tso1.startAndWait();
+ TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
+ LOG.info("================ Finished loading TSO 1 ==================");
+
+ // Configure TSO 2
+ TSOServerConfig config2 = new TSOServerConfig();
+ config2.setPort(TSO2_PORT);
+ config2.setMaxItems(1000);
+ config2.setLeaseModule(new TestHALeaseManagementModule(TEST_LEASE_PERIOD_MS, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkConnection, NAMESPACE));
+ Injector injector2 = Guice.createInjector(new TestTSOModule(hbaseConf, config2));
+ LOG.info("===================== Starting TSO 2 =====================");
+ tso2 = injector2.getInstance(TSOServer.class);
+ injector2.getInstance(LeaseManagement.class);
+ tso2.startAndWait();
+ // Don't do this here: TestUtils.waitForSocketListening("localhost", 4321, 100);
+ LOG.info("================ Finished loading TSO 2 ==================");
+
+ // Wait till the master TSO is up
+ barrierTillTSOAddressPublication.await();
+ currentTSOZNode.close();
+
+ // Configure HBase TM
+ LOG.info("===================== Starting TM =====================");
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionType(HA);
+ hbaseOmidClientConf.setConnectionString(zkConnection);
+ hbaseOmidClientConf.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
+ hbaseOmidClientConf.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
+ tm = HBaseTransactionManager.builder(hbaseOmidClientConf).build();
+ LOG.info("===================== TM Started =========================");
+ }
+
+
+ @AfterMethod(alwaysRun = true, timeOut = 60_000)
+ public void cleanup() throws Exception {
+ LOG.info("Cleanup");
+ HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
+ deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
+ hBaseUtils.createTable(Bytes.toBytes(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME),
+ new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
+ Integer.MAX_VALUE);
+ tso1.stopAndWait();
+ TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
+ tso2.stopAndWait();
+ TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
+
+ zkClient.delete().forPath(TSO_LEASE_PATH);
+ LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
+ zkClient.delete().forPath(CURRENT_TSO_PATH);
+ LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
+
+ zkClient.close();
+ }
+
+ //
+ // TSO 1 is MASTER & TSO 2 is BACKUP
+ // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
+ // TX 1 starts (TSO1)
+ // TX 1 modifies cells R1C1 & R2C2 (v1)
+ // Interleaved Read TX -IR TX- starts (TSO1)
+ // TSO 1 PAUSES -> TSO 2 becomes MASTER
+ // IR TX reads R1C1 -> should get v0
+ // TX 1 tries to commit -> should abort because was started in TSO 1
+ // IR TX reads R2C2 -> should get v0
+ // IR TX tries to commit -> should abort because was started in TSO 1
+ // End of Test state: R1C1 & R2C2 (v0)
+ @Test(timeOut = 60_000)
+ public void testScenario1() throws Exception {
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Write initial values for the test
+ HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
+ long initialEpoch = tx0.getEpoch();
+ LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
+ Put putInitialDataRow1 = new Put(row1);
+ putInitialDataRow1.add(TEST_FAMILY.getBytes(), qualifier1, initialData);
+ txTable.put(tx0, putInitialDataRow1);
+ Put putInitialDataRow2 = new Put(row2);
+ putInitialDataRow2.add(TEST_FAMILY.getBytes(), qualifier2, initialData);
+ txTable.put(tx0, putInitialDataRow2);
+ tm.commit(tx0);
+
+ // Initial checks
+ checkRowValues(txTable, initialData, initialData);
+
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
+ Bytes.toString(data1_q2));
+ Put putData1R1Q1 = new Put(row1);
+ putData1R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
+ txTable.put(tx1, putData1R1Q1);
+ Put putData1R2Q2 = new Put(row2);
+ putData1R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
+ txTable.put(tx1, putData1R2Q2);
+
+ Transaction interleavedReadTx = tm.begin();
+
+ LOG.info("Starting Interleaving Read Tx {} for checking cell values", interleavedReadTx.getTransactionId());
+
+ // Simulate a GC pause to change mastership (should throw a ServiceUnavailable exception)
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ leaseManager1.pausedInStillInLeasePeriod();
+
+ // Read interleaved and check the values writen by tx 1
+ Get getRow1 = new Get(row1).setMaxVersions(1);
+ getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
+ Result r = txTable.get(interleavedReadTx, getRow1);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
+ "Unexpected value for SI read R1Q1" + interleavedReadTx + ": "
+ + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
+
+ // Try to commit, but it should abort due to the change in mastership
+ try {
+ tm.commit(tx1);
+ fail();
+ } catch (RollbackException e) {
+ // Expected
+ LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
+ assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
+ assertEquals(tx1.getEpoch(), initialEpoch);
+ }
+
+ // Read interleaved and check the values written by tx 1
+ Get getRow2 = new Get(row2).setMaxVersions(1);
+ r = txTable.get(interleavedReadTx, getRow2);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
+ "Unexpected value for SI read R2Q2" + interleavedReadTx + ": "
+ + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
+
+ // Should commit because its a read only tx does not have to contact the TSO
+ tm.commit(interleavedReadTx);
+ assertEquals(interleavedReadTx.getEpoch(), initialEpoch);
+ assertEquals(interleavedReadTx.getStatus(), Transaction.Status.COMMITTED_RO);
+
+ LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
+ TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
+
+ checkRowValues(txTable, initialData, initialData);
+
+ // Need to resume to let other test progress
+ leaseManager1.resume();
+
+ }
+
+ }
+
+ //
+ // TSO 1 is MASTER & TSO 2 is BACKUP
+ // Setup: TX 0 -> Add initial data to cells R1C1 (v0) & R2C2 (v0)
+ // TX 1 starts (TSO1)
+ // TX 1 modifies cells R1C1 & R2C2 (v1)
+ // TSO 1 is KILLED -> TSO 2 becomes MASTER
+ // TX 1 tries to commit -> should abort because was started in TSO 1
+ // TX 2 starts (TSO1)
+ // TX 2 reads R1C1 -> should get v0
+ // TX 2 reads R2C2 -> should get v0
+ // TX 2 modifies cells R1C1 & R2C2 (v2)
+ // TX 2 commits
+ // End of Test state: R1C1 & R2C2 (v2)
+ @Test(timeOut = 60_000)
+ public void testScenario2() throws Exception {
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Write initial values for the test
+ HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
+ long initialEpoch = tx0.getEpoch();
+ LOG.info("Starting Tx {} writing initial values for cells ({}) ", tx0, Bytes.toString(initialData));
+ Put putInitialDataRow1 = new Put(row1);
+ putInitialDataRow1.add(TEST_FAMILY.getBytes(), qualifier1, initialData);
+ txTable.put(tx0, putInitialDataRow1);
+ Put putInitialDataRow2 = new Put(row2);
+ putInitialDataRow2.add(TEST_FAMILY.getBytes(), qualifier2, initialData);
+ txTable.put(tx0, putInitialDataRow2);
+ tm.commit(tx0);
+
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx1, Bytes.toString(data1_q1),
+ Bytes.toString(data1_q2));
+ Put putData1R1Q1 = new Put(row1);
+ putData1R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data1_q1);
+ txTable.put(tx1, putData1R1Q1);
+ Put putData1R2Q2 = new Put(row2);
+ putData1R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data1_q2);
+ txTable.put(tx1, putData1R2Q2);
+
+ // Provoke change in mastership (should throw a Connection exception)
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ tso1.stopAndWait();
+ TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
+
+ // Try to commit, but it should abort due to the change in mastership
+ try {
+ tm.commit(tx1);
+ String failMsg = String.format("%s should not commit. Initial epoch was: %d", tx1, initialEpoch);
+ fail(failMsg);
+ } catch (RollbackException e) {
+ // Expected
+ LOG.info("Rollback cause for Tx {}: ", tx1, e.getCause());
+ assertEquals(tx1.getStatus(), Transaction.Status.ROLLEDBACK);
+ assertEquals(tx1.getEpoch(), initialEpoch);
+ }
+
+ LOG.info("Sleep some time till the client is informed about"
+ + "the new TSO connection parameters and how can connect");
+ TimeUnit.SECONDS.sleep(10 + 2);
+
+ HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
+ LOG.info("Starting Tx {} writing values for cells ({}, {}) ", tx2, Bytes.toString(data1_q1),
+ Bytes.toString(data1_q2));
+ Get getData1R1Q1 = new Get(row1).setMaxVersions(1);
+ Result r = txTable.get(tx2, getData1R1Q1);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), initialData,
+ "Unexpected value for SI read R1Q1" + tx2 + ": "
+ + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
+ Get getData1R2Q2 = new Get(row2).setMaxVersions(1);
+ r = txTable.get(tx2, getData1R2Q2);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), initialData,
+ "Unexpected value for SI read R1Q1" + tx2 + ": "
+ + Bytes.toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
+
+ Put putData2R1Q1 = new Put(row1);
+ putData2R1Q1.add(TEST_FAMILY.getBytes(), qualifier1, data2_q1);
+ txTable.put(tx2, putData2R1Q1);
+ Put putData2R2Q2 = new Put(row2);
+ putData2R2Q2.add(TEST_FAMILY.getBytes(), qualifier2, data2_q2);
+ txTable.put(tx2, putData2R2Q2);
+ // This one should commit in the new TSO
+ tm.commit(tx2);
+
+ assertEquals(tx2.getStatus(), Transaction.Status.COMMITTED);
+ assertTrue(tx2.getEpoch() > tx0.getCommitTimestamp());
+
+ checkRowValues(txTable, data2_q1, data2_q2);
+ }
+
+ }
+
+ private void checkRowValues(TTable txTable, byte[] expectedDataR1Q1, byte[] expectedDataR2Q2)
+ throws TransactionException, IOException, RollbackException {
+ Transaction readTx = tm.begin();
+ LOG.info("Starting Read Tx {} for checking cell values", readTx.getTransactionId());
+ Get getRow1 = new Get(row1).setMaxVersions(1);
+ getRow1.addColumn(TEST_FAMILY.getBytes(), qualifier1);
+ Result r = txTable.get(readTx, getRow1);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier1), expectedDataR1Q1,
+ "Unexpected value for SI read R1Q1" + readTx + ": " + Bytes
+ .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier1)));
+ Get getRow2 = new Get(row2).setMaxVersions(1);
+ r = txTable.get(readTx, getRow2);
+ assertEquals(r.getValue(TEST_FAMILY.getBytes(), qualifier2), expectedDataR2Q2,
+ "Unexpected value for SI read R2Q2" + readTx + ": " + Bytes
+ .toString(r.getValue(TEST_FAMILY.getBytes(), qualifier2)));
+ tm.commit(readTx);
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helpers
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private static CuratorFramework provideInitializedZookeeperClient(String zkConnection) throws Exception {
+
+ LOG.info("Creating Zookeeper Client connecting to {}", zkConnection);
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zkClient = CuratorFrameworkFactory
+ .builder()
+ .namespace(NAMESPACE)
+ .connectString(zkConnection)
+ .retryPolicy(retryPolicy).build();
+
+ LOG.info("Connecting to ZK cluster {}", zkClient.getState());
+ zkClient.start();
+ zkClient.blockUntilConnected();
+ LOG.info("Connection to ZK cluster {}", zkClient.getState());
+
+ return zkClient;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
new file mode 100644
index 0000000..5e3d7c0
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNull;
+
+/**
+ * Tests to verify that Get and Scan filters still work with transactions tables
+ */
+@Test(groups = "sharedHBase")
+public class TestFilters extends OmidTestBase {
+
+ byte[] family = Bytes.toBytes(TEST_FAMILY);
+ private byte[] row1 = Bytes.toBytes("row1");
+ private byte[] row2 = Bytes.toBytes("row2");
+ private byte[] row3 = Bytes.toBytes("row3");
+ private byte[] prefix = Bytes.toBytes("foo");
+ private byte[] col1 = Bytes.toBytes("foobar");
+ private byte[] col2 = Bytes.toBytes("boofar");
+
+ @Test(timeOut = 60_000)
+ public void testGetWithColumnPrefixFilter(ITestContext context) throws Exception {
+ testGet(context, new ColumnPrefixFilter(prefix));
+ }
+
+ @Test(timeOut = 60_000)
+ public void testGetWithValueFilter(ITestContext context) throws Exception {
+ testGet(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
+ }
+
+ private void testGet(ITestContext context, Filter f) throws Exception {
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString("localhost:1234");
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+ PostCommitActions syncPostCommitter = spy(
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .commitTableClient(commitTableClient)
+ .postCommitter(syncPostCommitter)
+ .build();
+
+ writeRows(table, tm, syncPostCommitter);
+
+ Transaction t = tm.begin();
+ Get g = new Get(row1);
+ g.setFilter(f);
+
+ Result r = table.get(t, g);
+ assertEquals("should exist in result", 1, r.getColumnCells(family, col1).size());
+ assertEquals("shouldn't exist in result", 0, r.getColumnCells(family, col2).size());
+
+ g = new Get(row2);
+ g.setFilter(f);
+ r = table.get(t, g);
+ assertEquals("should exist in result", 1, r.getColumnCells(family, col1).size());
+ assertEquals("shouldn't exist in result", 0, r.getColumnCells(family, col2).size());
+
+ g = new Get(row3);
+ g.setFilter(f);
+ r = table.get(t, g);
+ assertEquals("shouldn't exist in result", 0, r.getColumnCells(family, col2).size());
+ }
+
+ @Test(timeOut = 60_000)
+ public void testScanWithColumnPrefixFilter(ITestContext context) throws Exception {
+ testScan(context, new ColumnPrefixFilter(prefix));
+ }
+
+ @Test(timeOut = 60_000)
+ public void testScanWithValueFilter(ITestContext context) throws Exception {
+ testScan(context, new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(col1)));
+ }
+
+ private void testScan(ITestContext context, Filter f) throws Exception {
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.getOmidClientConfiguration().setConnectionString("localhost:1234");
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+ PostCommitActions syncPostCommitter = spy(
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .commitTableClient(commitTableClient)
+ .postCommitter(syncPostCommitter)
+ .build();
+
+ writeRows(table, tm, syncPostCommitter);
+
+ Transaction t = tm.begin();
+ Scan s = new Scan().setFilter(f);
+
+ ResultScanner rs = table.getScanner(t, s);
+
+ Result r = rs.next();
+ assertEquals("should exist in result", 1, r.getColumnCells(family, col1).size());
+ assertEquals("shouldn't exist in result", 0, r.getColumnCells(family, col2).size());
+
+ r = rs.next();
+ assertEquals("should exist in result", 1, r.getColumnCells(family, col1).size());
+ assertEquals("shouldn't exist in result", 0, r.getColumnCells(family, col2).size());
+
+ r = rs.next();
+ assertNull("Last row shouldn't exist", r);
+ }
+
+
+ private void writeRows(TTable table, TransactionManager tm, PostCommitActions postCommitter)
+ throws Exception {
+ // create normal row with both cells
+ Transaction t = tm.begin();
+ Put p = new Put(row1);
+ p.add(family, col1, col1);
+ p.add(family, col2, col2);
+ table.put(t, p);
+ tm.commit(t);
+
+ // create normal row, but fail to update shadow cells
+ doAnswer(new Answer<ListenableFuture<Void>>() {
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) {
+ // Do not invoke the real method
+ return SettableFuture.create();
+ }
+ }).when(postCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ t = tm.begin();
+ p = new Put(row2);
+ p.add(family, col1, col1);
+ p.add(family, col2, col2);
+ table.put(t, p);
+ try {
+ tm.commit(t);
+ } catch (TransactionException e) {
+ // Expected, see comment above
+ }
+
+ // create normal row with only one cell
+ t = tm.begin();
+ p = new Put(row3);
+ p.add(family, col2, col2);
+ table.put(t, p);
+ try {
+ tm.commit(t);
+ } catch (TransactionException e) {
+ // Expected, see comment above
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestHALeaseManagementModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHALeaseManagementModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHALeaseManagementModule.java
new file mode 100644
index 0000000..761fa52
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHALeaseManagementModule.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import org.apache.omid.timestamp.storage.ZKModule;
+import org.apache.omid.tso.LeaseManagement;
+import org.apache.omid.tso.Panicker;
+import org.apache.omid.tso.PausableLeaseManager;
+import org.apache.omid.tso.TSOChannelHandler;
+import org.apache.omid.tso.TSOStateManager;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import static org.apache.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
+
+class TestHALeaseManagementModule extends AbstractModule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHALeaseManagementModule.class);
+ private final long leasePeriodInMs;
+ private final String tsoLeasePath;
+ private final String currentTsoPath;
+ private final String zkCluster;
+ private final String zkNamespace;
+
+ TestHALeaseManagementModule(long leasePeriodInMs, String tsoLeasePath, String currentTsoPath,
+ String zkCluster, String zkNamespace) {
+ this.leasePeriodInMs = leasePeriodInMs;
+ this.tsoLeasePath = tsoLeasePath;
+ this.currentTsoPath = currentTsoPath;
+ this.zkCluster = zkCluster;
+ this.zkNamespace = zkNamespace;
+ }
+
+ @Override
+ protected void configure() {
+ install(new ZKModule(zkCluster, zkNamespace));
+ }
+
+ @Provides
+ @Singleton
+ LeaseManagement provideLeaseManager(@Named(TSO_HOST_AND_PORT_KEY) String tsoHostAndPort,
+ TSOChannelHandler tsoChannelHandler,
+ TSOStateManager stateManager,
+ CuratorFramework zkClient,
+ Panicker panicker)
+ throws LeaseManagement.LeaseManagementException {
+
+ LOG.info("Connection to ZK cluster [{}]", zkClient.getState());
+ return new PausableLeaseManager(tsoHostAndPort, tsoChannelHandler, stateManager, leasePeriodInMs,
+ tsoLeasePath, currentTsoPath, zkClient, panicker);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseOmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseOmidClientConfiguration.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseOmidClientConfiguration.java
new file mode 100644
index 0000000..ff98c41
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseOmidClientConfiguration.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+public class TestHBaseOmidClientConfiguration {
+
+ @Test
+ public void testYamlReading() {
+ HBaseOmidClientConfiguration configuration = new HBaseOmidClientConfiguration();
+ Assert.assertNotNull(configuration.getCommitTableName());
+ Assert.assertNotNull(configuration.getHBaseConfiguration());
+ Assert.assertNotNull(configuration.getMetrics());
+ Assert.assertNotNull(configuration.getOmidClientConfiguration());
+ }
+
+ @Test
+ public void testYamlReadingFromFile() {
+ HBaseOmidClientConfiguration configuration = new HBaseOmidClientConfiguration("/test-hbase-omid-client-config.yml");
+ Assert.assertNotNull(configuration.getCommitTableName());
+ Assert.assertNotNull(configuration.getHBaseConfiguration());
+ Assert.assertNotNull(configuration.getMetrics());
+ Assert.assertNotNull(configuration.getOmidClientConfiguration());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
new file mode 100644
index 0000000..8be7950
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.transaction.HBaseTransactionManager.CommitTimestampLocatorImpl;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
+import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
+import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
+import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestHBaseTransactionClient extends OmidTestBase {
+
+ private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
+ private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
+ private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
+ private static final byte[] qualifier = Bytes.toBytes("testdata");
+ private static final byte[] data1 = Bytes.toBytes("testWrite-1");
+
+ @Test(timeOut = 30_000)
+ public void testIsCommitted(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ tm.commit(t1);
+
+ HBaseTransaction t2 = (HBaseTransaction) tm.begin();
+ put = new Put(row2);
+ put.add(family, qualifier, data1);
+ table.put(t2, put);
+ table.getHTable().flushCommits();
+
+ HBaseTransaction t3 = (HBaseTransaction) tm.begin();
+ put = new Put(row2);
+ put.add(family, qualifier, data1);
+ table.put(t3, put);
+ tm.commit(t3);
+
+ HTable htable = new HTable(hbaseConf, TEST_TABLE);
+ HBaseCellId hBaseCellId1 = new HBaseCellId(htable, row1, family, qualifier, t1.getStartTimestamp());
+ HBaseCellId hBaseCellId2 = new HBaseCellId(htable, row2, family, qualifier, t2.getStartTimestamp());
+ HBaseCellId hBaseCellId3 = new HBaseCellId(htable, row2, family, qualifier, t3.getStartTimestamp());
+
+ HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
+ assertTrue("row1 should be committed", hbaseTm.isCommitted(hBaseCellId1));
+ assertFalse("row2 should not be committed for kv2", hbaseTm.isCommitted(hBaseCellId2));
+ assertTrue("row2 should be committed for kv3", hbaseTm.isCommitted(hBaseCellId3));
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCrashAfterCommit(ITestContext context) throws Exception {
+ PostCommitActions syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Test shadow cell are created properly
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ try {
+ tm.commit(t1);
+ } catch (Exception e) { // (*) crash
+ // Do nothing
+ }
+
+ assertTrue("Cell should be there",
+ CellUtils.hasCell(row1,
+ family,
+ qualifier,
+ t1.getStartTimestamp(),
+ new TTableCellGetterAdapter(table)));
+ assertFalse("Shadow cell should not be there",
+ CellUtils.hasShadowCell(row1,
+ family,
+ qualifier,
+ t1.getStartTimestamp(),
+ new TTableCellGetterAdapter(table)));
+
+ HTable htable = new HTable(hbaseConf, TEST_TABLE);
+ HBaseCellId hBaseCellId = new HBaseCellId(htable, row1, family, qualifier, t1.getStartTimestamp());
+
+ HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
+ assertTrue("row1 should be committed", hbaseTm.isCommitted(hBaseCellId));
+ }
+
+ @Test(timeOut = 30_000)
+ public void testReadCommitTimestampFromCommitTable(ITestContext context) throws Exception {
+
+ final long NON_EXISTING_CELL_TS = 1000L;
+
+ PostCommitActions syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ // Test that a non-existing cell timestamp returns an empty result
+ Optional<CommitTimestamp> optionalCT = tm.commitTableClient.getCommitTimestamp(NON_EXISTING_CELL_TS).get();
+ assertFalse(optionalCT.isPresent());
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ // Test that we get an invalidation mark for an invalidated transaction
+
+ // Start a transaction and invalidate it before commiting it
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+
+ assertTrue(tm.commitTableClient.tryInvalidateTransaction(tx1.getStartTimestamp()).get());
+ optionalCT = tm.commitTableClient.getCommitTimestamp(tx1.getStartTimestamp()).get();
+ assertTrue(optionalCT.isPresent());
+ CommitTimestamp ct = optionalCT.get();
+ assertFalse(ct.isValid());
+ assertEquals(CommitTable.INVALID_TRANSACTION_MARKER, ct.getValue());
+ assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
+
+ // Finally test that we get the right commit timestamp for a committed tx
+ // that couldn't get
+ HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
+ Put otherPut = new Put(row1);
+ otherPut.add(family, qualifier, data1);
+ table.put(tx2, otherPut);
+ try {
+ tm.commit(tx2);
+ } catch (Exception e) { // (*) crash
+ // Do nothing
+ }
+
+ optionalCT = tm.commitTableClient.getCommitTimestamp(tx2.getStartTimestamp()).get();
+ assertTrue(optionalCT.isPresent());
+ ct = optionalCT.get();
+ assertTrue(ct.isValid());
+ assertEquals(tx2.getCommitTimestamp(), ct.getValue());
+ assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testReadCommitTimestampFromShadowCell(ITestContext context) throws Exception {
+
+ final long NON_EXISTING_CELL_TS = 1L;
+
+ HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Test first we can not found a non-existent cell ts
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier, NON_EXISTING_CELL_TS);
+ // Set an empty cache to allow to bypass the checking
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ Optional<CommitTimestamp> optionalCT = tm
+ .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator);
+ assertFalse(optionalCT.isPresent());
+
+ // Then test that for a transaction committed, we get the right CT
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ tm.commit(tx1);
+ // Upon commit, the commit data should be in the shadow cells, so test it
+ optionalCT = tm.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
+ assertTrue(optionalCT.isPresent());
+ CommitTimestamp ct = optionalCT.get();
+ assertTrue(ct.isValid());
+ assertEquals(tx1.getCommitTimestamp(), ct.getValue());
+ assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
+
+ }
+
+ }
+
+ // Tests step 1 in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCellCommitTimestampIsLocatedInCache(ITestContext context) throws Exception {
+
+ final long CELL_ST = 1L;
+ final long CELL_CT = 2L;
+
+ HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
+
+ // Pre-load the element to look for in the cache
+ HTable table = new HTable(hbaseConf, TEST_TABLE);
+ HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_ST);
+ Map<Long, Long> fakeCache = Maps.newHashMap();
+ fakeCache.put(CELL_ST, CELL_CT);
+
+ // Then test that locator finds it in the cache
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache);
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator);
+ assertTrue(ct.isValid());
+ assertEquals(ct.getValue(), CELL_CT);
+ assertTrue(ct.getLocation().compareTo(CACHE) == 0);
+
+ }
+
+ // Tests step 2 in AbstractTransactionManager.locateCellCommitTimestamp()
+ // Note: This test is very similar to testCrashAfterCommit() above so
+ // maybe we should merge them in this test, adding the missing assertions
+ @Test(timeOut = 30_000)
+ public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
+
+ PostCommitActions syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ // Commit a transaction that is broken on commit to avoid
+ // write to the shadow cells and avoid cleaning the commit table
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ try {
+ tm.commit(tx1);
+ } catch (Exception e) { // (*) crash
+ // Do nothing
+ }
+
+ // Test the locator finds the appropriate data in the commit table
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier,
+ tx1.getStartTimestamp());
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+ ctLocator);
+ assertTrue(ct.isValid());
+ long expectedCommitTS = tx1.getStartTimestamp() + 1;
+ assertEquals(expectedCommitTS, ct.getValue());
+ assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
+ }
+
+ }
+
+ // Tests step 3 in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCellCommitTimestampIsLocatedInShadowCells(ITestContext context) throws Exception {
+
+ HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ // Commit a transaction to add ST/CT in commit table
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ tm.commit(tx1);
+ // Upon commit, the commit data should be in the shadow cells
+
+ // Test the locator finds the appropriate data in the shadow cells
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier,
+ tx1.getStartTimestamp());
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+ ctLocator);
+ assertTrue(ct.isValid());
+ assertEquals(tx1.getCommitTimestamp(), ct.getValue());
+ assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
+ }
+
+ }
+
+ // Tests step 4 in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception {
+
+ final long CURRENT_EPOCH_FAKE = 1000L;
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
+ // The following lines allow to reach step 4)
+ // in AbstractTransactionManager.locateCellCommitTimestamp()
+ SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
+ f.set(Optional.<CommitTimestamp>absent());
+ doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
+ doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class),
+ any(CommitTimestampLocator.class));
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Commit a transaction to add ST/CT in commit table
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ tm.commit(tx1);
+ // Upon commit, the commit data should be in the shadow cells
+
+ // Test a transaction in the previous epoch gets an InvalidCommitTimestamp class
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier,
+ tx1.getStartTimestamp());
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ // Fake the current epoch to simulate a newer TSO
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE, ctLocator);
+ assertFalse(ct.isValid());
+ assertEquals(CommitTable.INVALID_TRANSACTION_MARKER, ct.getValue());
+ assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
+ }
+ }
+
+ // Tests step 5 in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCellCommitTimestampIsLocatedInCommitTableAfterNotBeingInvalidated(ITestContext context) throws Exception {
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+ PostCommitActions syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
+
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+ // The next two lines avoid steps 2) and 3) and go directly to step 5)
+ // in AbstractTransactionManager.locateCellCommitTimestamp()
+ SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
+ f.set(Optional.<CommitTimestamp>absent());
+ doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class));
+ doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class),
+ any(CommitTimestampLocator.class));
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Commit a transaction that is broken on commit to avoid
+ // write to the shadow cells and avoid cleaning the commit table
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ try {
+ tm.commit(tx1);
+ } catch (Exception e) { // (*) crash
+ // Do nothing
+ }
+
+ // Test the locator finds the appropriate data in the commit table
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier,
+ tx1.getStartTimestamp());
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+ ctLocator);
+ assertTrue(ct.isValid());
+ assertEquals(tx1.getCommitTimestamp(), ct.getValue());
+ assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
+ }
+
+ }
+
+ // Tests step 6 in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCellCommitTimestampIsLocatedInShadowCellsAfterNotBeingInvalidated(ITestContext context) throws Exception {
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
+ // The next two lines avoid steps 2), 3) and 5) and go directly to step 6)
+ // in AbstractTransactionManager.locateCellCommitTimestamp()
+ SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
+ f.set(Optional.<CommitTimestamp>absent());
+ doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
+ doReturn(Optional.<CommitTimestamp>absent()).doCallRealMethod()
+ .when(tm).readCommitTimestampFromShadowCell(any(Long.class), any(CommitTimestampLocator.class));
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Commit a transaction to add ST/CT in commit table
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(tx1, put);
+ tm.commit(tx1);
+ // Upon commit, the commit data should be in the shadow cells
+
+ // Test the locator finds the appropriate data in the shadow cells
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier,
+ tx1.getStartTimestamp());
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+ ctLocator);
+ assertTrue(ct.isValid());
+ assertEquals(tx1.getCommitTimestamp(), ct.getValue());
+ assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
+ }
+
+ }
+
+ // Tests last step in AbstractTransactionManager.locateCellCommitTimestamp()
+ @Test(timeOut = 30_000)
+ public void testCTLocatorReturnsAValidCTWhenNotPresent(ITestContext context) throws Exception {
+
+ final long CELL_TS = 1L;
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient));
+ // The following lines allow to reach the last return statement
+ SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
+ f.set(Optional.<CommitTimestamp>absent());
+ doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
+ doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class),
+ any(CommitTimestampLocator.class));
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier, CELL_TS);
+ CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
+ Maps.<Long, Long>newHashMap());
+ CommitTimestamp ct = tm.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(), ctLocator);
+ assertTrue(ct.isValid());
+ assertEquals(-1L, ct.getValue());
+ assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
new file mode 100644
index 0000000..d65d27e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anySetOf;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+// TODO These tests should be adapted to a future test for AbstractTransactionManager as they should be DB independent
+@Test(groups = "sharedHBase")
+public class TestHBaseTransactionManager extends OmidTestBase {
+
+ private static final int FAKE_EPOCH_INCREMENT = 100;
+
+ private final byte[] row1 = Bytes.toBytes(TestHBaseTransactionManager.class.getCanonicalName());
+ private final byte[] testFamily = Bytes.toBytes(TEST_FAMILY);
+ private final byte[] qualifier = Bytes.toBytes("TEST_Q");
+ private final byte[] data1 = Bytes.toBytes("test_data1");
+
+
+ @Test(timeOut = 20_000)
+ public void testTxManagerGetsTimestampsInTheRightEpoch(ITestContext context) throws Exception {
+
+ TSOClient tsoClient = spy(getClient(context));
+
+ long fakeEpoch = tsoClient.getNewStartTimestamp().get() + FAKE_EPOCH_INCREMENT;
+
+ // Modify the epoch before testing the begin method
+ doReturn(fakeEpoch).when(tsoClient).getEpoch();
+
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, tsoClient));
+
+ // Create a transaction with the initial setup and check that the TX id matches the fake epoch created
+ Transaction tx1 = tm.begin();
+ assertEquals(tx1.getTransactionId(), fakeEpoch);
+ verify(tsoClient, timeout(100).times(FAKE_EPOCH_INCREMENT)).getEpoch();
+
+ }
+
+ @Test(timeOut = 20_000)
+ public void testReadOnlyTransactionsDoNotContactTSOServer(ITestContext context) throws Exception {
+
+ final int EXPECTED_INVOCATIONS_FOR_COMMIT = 1; // Test specific checks
+
+ TSOClient tsoClient = spy(getClient(context));
+ TransactionManager tm = newTransactionManager(context, tsoClient);
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Add initial data in a transactional context
+ Transaction tx1 = tm.begin();
+ Put put = new Put(row1);
+ put.add(testFamily, qualifier, data1);
+ txTable.put(tx1, put);
+ tm.commit(tx1);
+
+ verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class));
+
+ // Create a read-only tx and verify that commit has not been invoked again in the TSOClient
+ AbstractTransaction readOnlyTx = (AbstractTransaction) tm.begin();
+ Get get = new Get(row1);
+ Result r = txTable.get(readOnlyTx, get);
+ assertTrue(Bytes.equals(r.getValue(testFamily, qualifier), data1), "Wrong value for RO-TX " + readOnlyTx);
+ assertTrue(readOnlyTx.getWriteSet().isEmpty());
+ tm.commit(readOnlyTx);
+
+ verify(tsoClient, times(EXPECTED_INVOCATIONS_FOR_COMMIT)).commit(anyLong(), anySetOf(HBaseCellId.class));
+ assertEquals(readOnlyTx.getStatus(), Transaction.Status.COMMITTED_RO);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestMultiplePut.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestMultiplePut.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestMultiplePut.java
new file mode 100644
index 0000000..04689ac
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestMultiplePut.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestMultiplePut extends OmidTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestMultiplePut.class);
+
+ private static final byte[] testTable = Bytes.toBytes(TEST_TABLE);
+ private static final byte[] family = Bytes.toBytes(TEST_FAMILY);
+ private static final byte[] col1 = Bytes.toBytes("col1");
+ private static final byte[] col2 = Bytes.toBytes("col2");
+ private static final byte[] data = Bytes.toBytes("testData");
+
+ @Test(timeOut = 30_000)
+ public void testMultiPutInTwoDifferentColsOfSameRowAreInTheTableAfterCommit(ITestContext context) throws Exception {
+
+ TransactionManager tm = newTransactionManager(context);
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ Transaction tx = tm.begin();
+
+ byte[] rowToAdd = Bytes.toBytes(1000);
+
+ Put put1 = new Put(rowToAdd);
+ put1.add(family, col1, data);
+ txTable.put(tx, put1);
+
+ Put put2 = new Put(rowToAdd);
+ put2.add(family, col2, data);
+ txTable.put(tx, put2);
+
+ tm.commit(tx);
+
+ assertTrue(verifyValue(testTable, rowToAdd, family, col1, data), "Invalid value in table");
+ assertTrue(verifyValue(testTable, rowToAdd, family, col2, data), "Invalid value in table");
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testManyManyPutsInDifferentRowsAreInTheTableAfterCommit(ITestContext context) throws Exception {
+
+ final int NUM_ROWS_TO_ADD = 50;
+
+ TransactionManager tm = newTransactionManager(context);
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ Transaction tx = tm.begin();
+
+ for (int i = 0; i <= NUM_ROWS_TO_ADD; i++) {
+ byte[] rowToAdd = Bytes.toBytes(i);
+ byte[] dataForRowCol = Bytes.toBytes("testData" + i);
+ Put put = new Put(rowToAdd);
+ put.add(family, col1, dataForRowCol);
+ txTable.put(tx, put);
+ }
+
+ tm.commit(tx);
+
+ // Check some of the added values are there in the table
+ byte[] rowToCheck = Bytes.toBytes(0);
+ byte[] dataToCheck = Bytes.toBytes("testData" + 0);
+ assertTrue(verifyValue(testTable, rowToCheck, family, col1, dataToCheck), "Invalid value in table");
+ rowToCheck = Bytes.toBytes(NUM_ROWS_TO_ADD / 2);
+ dataToCheck = Bytes.toBytes("testData" + (NUM_ROWS_TO_ADD / 2));
+ assertTrue(verifyValue(testTable, rowToCheck, family, col1, dataToCheck), "Invalid value in table");
+ rowToCheck = Bytes.toBytes(NUM_ROWS_TO_ADD);
+ dataToCheck = Bytes.toBytes("testData" + NUM_ROWS_TO_ADD);
+ assertTrue(verifyValue(testTable, rowToCheck, family, col1, dataToCheck), "Invalid value in table");
+
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testGetFromNonExistentRowAfterMultiplePutsReturnsNoResult(ITestContext context) throws Exception {
+
+ final int NUM_ROWS_TO_ADD = 10;
+
+ TransactionManager tm = newTransactionManager(context);
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ Transaction tx = tm.begin();
+
+ for (int i = 0; i < NUM_ROWS_TO_ADD; i++) {
+ byte[] rowToAdd = Bytes.toBytes(i);
+ Put put = new Put(rowToAdd);
+ put.add(family, col1, Bytes.toBytes("testData" + i));
+ txTable.put(tx, put);
+ }
+
+ byte[] nonExistentRow = Bytes.toBytes(NUM_ROWS_TO_ADD + 5);
+ Get get = new Get(nonExistentRow);
+ Result result = txTable.get(tx, get);
+
+ assertTrue(result.isEmpty(), "Found a row that should not exist");
+
+ tm.commit(tx);
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestReadPath.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestReadPath.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestReadPath.java
new file mode 100644
index 0000000..dd621ca
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestReadPath.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestReadPath extends OmidTestBase {
+
+ final byte[] family = Bytes.toBytes(TEST_FAMILY);
+ final byte[] row = Bytes.toBytes("row");
+ private final byte[] col = Bytes.toBytes("col1");
+ final byte[] data = Bytes.toBytes("data");
+ private final byte[] uncommitted = Bytes.toBytes("uncommitted");
+
+ @Test
+ public void testReadInterleaved(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ // Put some data on the DB
+ Transaction t1 = tm.begin();
+ Transaction t2 = tm.begin();
+
+ Put put = new Put(row);
+ put.add(family, col, data);
+ table.put(t1, put);
+ tm.commit(t1);
+
+ Get get = new Get(row);
+ Result result = table.get(t2, get);
+ assertFalse("Should be unable to read column", result.containsColumn(family, col));
+ }
+
+ @Test
+ public void testReadWithSeveralUncommitted(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ // Put some data on the DB
+ Transaction t = tm.begin();
+ Put put = new Put(row);
+ put.add(family, col, data);
+ table.put(t, put);
+ tm.commit(t);
+ List<Transaction> running = new ArrayList<>();
+
+ // Shade the data with uncommitted data
+ for (int i = 0; i < 10; ++i) {
+ t = tm.begin();
+ put = new Put(row);
+ put.add(family, col, uncommitted);
+ table.put(t, put);
+ running.add(t);
+ }
+
+ // Try to read from row, it should ignore the uncommitted data and return the original committed value
+ t = tm.begin();
+ Get get = new Get(row);
+ Result result = table.get(t, get);
+ Cell cell = result.getColumnLatestCell(family, col);
+ assertNotNull("KeyValue is null", cell);
+ byte[] value = CellUtil.cloneValue(cell);
+ assertTrue("Read data doesn't match", Arrays.equals(data, value));
+ tm.commit(t);
+
+ table.close();
+
+ for (Transaction r : running) {
+ tm.rollback(r);
+ }
+
+ }
+
+}