You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:51 UTC
[34/52] [abbrv] incubator-omid git commit: Move com.yahoo ->
org.apache
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
new file mode 100644
index 0000000..03599f7
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.omid.committable.CommitTable;
+
+import org.apache.omid.metrics.NullMetricsProvider;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.omid.transaction.CellUtils.hasCell;
+import static org.apache.omid.transaction.CellUtils.hasShadowCell;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestShadowCells extends OmidTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestShadowCells.class);
+
+ private static final String TSO_SERVER_HOST = "localhost";
+ private static final int TSO_SERVER_PORT = 1234;
+
+ private static final String TEST_TABLE = "test";
+ private static final String TEST_FAMILY = "data";
+
+ static final byte[] row = Bytes.toBytes("test-sc");
+ private static final byte[] row1 = Bytes.toBytes("test-sc1");
+ private static final byte[] row2 = Bytes.toBytes("test-sc2");
+ private static final byte[] row3 = Bytes.toBytes("test-sc3");
+ static final byte[] family = Bytes.toBytes(TEST_FAMILY);
+ private static final byte[] qualifier = Bytes.toBytes("testdata");
+ private static final byte[] data1 = Bytes.toBytes("testWrite-1");
+
+
+ @Test(timeOut = 60_000)
+ public void testShadowCellsBasics(ITestContext context) throws Exception {
+
+ TransactionManager tm = newTransactionManager(context);
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Test shadow cells are created properly
+ Put put = new Put(row);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+
+ // Before commit test that only the cell is there
+ assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell shouldn't be there");
+
+ tm.commit(t1);
+
+ // After commit test that both cell and shadow cell are there
+ assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should be there");
+
+ // Test that we can make a valid read after adding a shadow cell without hitting the commit table
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ TransactionManager tm2 = HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .commitTableClient(commitTableClient)
+ .build();
+
+ Transaction t2 = tm2.begin();
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+
+ Result getResult = table.get(t2, get);
+ assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
+ verify(commitTableClient, never()).getCommitTimestamp(anyLong());
+ }
+
+ @Test(timeOut = 60_000)
+ public void testCrashingAfterCommitDoesNotWriteShadowCells(ITestContext context) throws Exception {
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ PostCommitActions syncPostCommitter = spy(
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .postCommitter(syncPostCommitter)
+ .commitTableClient(commitTableClient)
+ .build());
+
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Test shadow cell are created properly
+ Put put = new Put(row);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ try {
+ tm.commit(t1);
+ } catch (Exception e) { // (*) crash
+ // Do nothing
+ }
+
+ // After commit with the emulated crash, test that only the cell is there
+ assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should not be there");
+
+ Transaction t2 = tm.begin();
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+
+ Result getResult = table.get(t2, get);
+ assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Shadow cell should not be there");
+ verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
+ }
+
+ @Test(timeOut = 60_000)
+ public void testShadowCellIsHealedAfterCommitCrash(ITestContext context) throws Exception {
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ PostCommitActions syncPostCommitter = spy(
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .postCommitter(syncPostCommitter)
+ .commitTableClient(commitTableClient)
+ .build());
+
+ // The following line emulates a crash after commit that is observed in (*) below
+ doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Test shadow cell are created properly
+ Put put = new Put(row);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ try {
+ tm.commit(t1);
+ } catch (Exception e) { // (*) Crash
+ // Do nothing
+ }
+
+ assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertFalse(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should not be there");
+
+ Transaction t2 = tm.begin();
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+
+ // This get should heal the shadow cell
+ Result getResult = table.get(t2, get);
+ assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
+ verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
+
+ assertTrue(hasCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertTrue(hasShadowCell(row, family, qualifier, t1.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should be there after being healed");
+
+ // As the shadow cell is healed, this get shouldn't have to hit the storage,
+ // so the number of invocations to commitTableClient.getCommitTimestamp()
+ // should remain the same
+ getResult = table.get(t2, get);
+ assertTrue(Arrays.equals(data1, getResult.getValue(family, qualifier)), "Values should be the same");
+ verify(commitTableClient, times(1)).getCommitTimestamp(anyLong());
+ }
+
+ @Test(timeOut = 60_000)
+ public void testTransactionNeverCompletesWhenAnExceptionIsThrownUpdatingShadowCells(ITestContext context)
+ throws Exception {
+
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ PostCommitActions syncPostCommitter = spy(
+ new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+ AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .postCommitter(syncPostCommitter)
+ .commitTableClient(commitTableClient)
+ .build());
+
+ final TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ HBaseTransaction tx = (HBaseTransaction) tm.begin();
+
+ Put put = new Put(row);
+ put.add(family, qualifier, data1);
+ table.put(tx, put);
+
+ // This line emulates an error accessing the target table by disabling it
+ doAnswer(new Answer<ListenableFuture<Void>>() {
+ @Override
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+ table.flushCommits();
+ HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
+ admin.disableTable(table.getTableName());
+ return (ListenableFuture<Void>) invocation.callRealMethod();
+ }
+ }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ // When committing, an IOException should be thrown in syncPostCommitter.updateShadowCells() and placed in the
+ // future as a TransactionManagerException. However, the exception is never retrieved in the
+ // AbstractTransactionManager as the future is never checked.
+ // This requires to set the HConstants.HBASE_CLIENT_RETRIES_NUMBER in the HBase config to a finite number:
+ // e.g -> hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3); Otherwise it will get stuck in tm.commit();
+
+ tm.commit(tx); // Tx effectively commits but the post Commit Actions failed when updating the shadow cells
+
+ // Re-enable table to allow the required checks below
+ HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
+ admin.enableTable(table.getTableName());
+
+ // 1) check that shadow cell is not created...
+ assertTrue(hasCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Cell should be there");
+ assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
+ "Shadow cell should not be there");
+ // 2) and thus, completeTransaction() was never called on the commit table...
+ verify(commitTableClient, times(0)).completeTransaction(anyLong());
+ // 3) ...and commit value still in commit table
+ assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
+
+ }
+
+ @Test(timeOut = 60_000)
+ public void testRaceConditionBetweenReaderAndWriterThreads(final ITestContext context) throws Exception {
+ final CountDownLatch readAfterCommit = new CountDownLatch(1);
+ final CountDownLatch postCommitBegin = new CountDownLatch(1);
+ final CountDownLatch postCommitEnd = new CountDownLatch(1);
+
+ final AtomicBoolean readFailed = new AtomicBoolean(false);
+ PostCommitActions syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+ AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
+
+ doAnswer(new Answer<ListenableFuture<Void>>() {
+ @Override
+ public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+ LOG.info("Releasing readAfterCommit barrier");
+ readAfterCommit.countDown();
+ LOG.info("Waiting postCommitBegin barrier");
+ postCommitBegin.await();
+ ListenableFuture<Void> result = (ListenableFuture<Void>) invocation.callRealMethod();
+ LOG.info("Releasing postCommitEnd barrier");
+ postCommitEnd.countDown();
+ return result;
+ }
+ }).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
+
+ // Start transaction on write thread
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+
+ final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+ // Start read thread
+ Thread readThread = new Thread("Read Thread") {
+ @Override
+ public void run() {
+ LOG.info("Waiting readAfterCommit barrier");
+ try {
+ readAfterCommit.await();
+ final TTable table = spy(new TTable(hbaseConf, TEST_TABLE));
+ doAnswer(new Answer<List<KeyValue>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<KeyValue> answer(InvocationOnMock invocation) throws Throwable {
+ LOG.info("Release postCommitBegin barrier");
+ postCommitBegin.countDown();
+ LOG.info("Waiting postCommitEnd barrier");
+ postCommitEnd.await();
+ return (List<KeyValue>) invocation.callRealMethod();
+ }
+ }).when(table).filterCellsForSnapshot(Matchers.<List<Cell>>any(),
+ any(HBaseTransaction.class), anyInt());
+
+ TransactionManager tm = newTransactionManager(context);
+ if (hasShadowCell(row,
+ family,
+ qualifier,
+ t1.getStartTimestamp(),
+ new TTableCellGetterAdapter(table))) {
+ readFailed.set(true);
+ }
+
+ Transaction t = tm.begin();
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+
+ Result getResult = table.get(t, get);
+ Cell cell = getResult.getColumnLatestCell(family, qualifier);
+ if (!Arrays.equals(data1, CellUtil.cloneValue(cell))
+ || !hasShadowCell(row,
+ family,
+ qualifier,
+ cell.getTimestamp(),
+ new TTableCellGetterAdapter(table))) {
+ readFailed.set(true);
+ } else {
+ LOG.info("Read succeeded");
+ }
+ } catch (Throwable e) {
+ readFailed.set(true);
+ LOG.error("Error whilst reading", e);
+ }
+ }
+ };
+ readThread.start();
+
+ // Write data
+ Put put = new Put(row);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ tm.commit(t1);
+
+ readThread.join();
+
+ assertFalse(readFailed.get(), "Read should have succeeded");
+
+ }
+
+ // TODO: After removing the legacy shadow cell suffix, maybe we should mix the assertions in this test with
+ // the ones in the previous tests in a further commit
+
+ /**
+ * Test that the new client can read shadow cells written by the old client.
+ */
+ @Test
+ public void testGetOldShadowCells(ITestContext context) throws Exception {
+
+ TransactionManager tm = newTransactionManager(context);
+
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+ HTableInterface htable = table.getHTable();
+
+ // Test shadow cell are created properly
+ HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+ Put put = new Put(row1);
+ put.add(family, qualifier, data1);
+ table.put(t1, put);
+ tm.commit(t1);
+
+ HBaseTransaction t2 = (HBaseTransaction) tm.begin();
+ put = new Put(row2);
+ put.add(family, qualifier, data1);
+ table.put(t2, put);
+ tm.commit(t2);
+
+ HBaseTransaction t3 = (HBaseTransaction) tm.begin();
+ put = new Put(row3);
+ put.add(family, qualifier, data1);
+ table.put(t3, put);
+ tm.commit(t3);
+
+ // ensure that transaction is no longer in commit table
+ // the only place that should have the mapping is the shadow cells
+ CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
+ Optional<CommitTable.CommitTimestamp> ct1 = commitTableClient.getCommitTimestamp(t1.getStartTimestamp()).get();
+ Optional<CommitTable.CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(t2.getStartTimestamp()).get();
+ Optional<CommitTable.CommitTimestamp> ct3 = commitTableClient.getCommitTimestamp(t3.getStartTimestamp()).get();
+ assertFalse(ct1.isPresent(), "Shouldn't exist in commit table");
+ assertFalse(ct2.isPresent(), "Shouldn't exist in commit table");
+ assertFalse(ct3.isPresent(), "Shouldn't exist in commit table");
+
+ // delete new shadow cell
+ Delete del = new Delete(row2);
+ del.deleteColumn(family, CellUtils.addShadowCellSuffix(qualifier));
+ htable.delete(del);
+ htable.flushCommits();
+
+ // verify that we can't read now (since shadow cell is missing)
+ Transaction t4 = tm.begin();
+ Get get = new Get(row2);
+ get.addColumn(family, qualifier);
+
+ Result getResult = table.get(t4, get);
+ assertTrue(getResult.isEmpty(), "Should have nothing");
+
+ Transaction t5 = tm.begin();
+ Scan s = new Scan();
+ ResultScanner scanner = table.getScanner(t5, s);
+ Result result1 = scanner.next();
+ Result result2 = scanner.next();
+ Result result3 = scanner.next();
+ scanner.close();
+
+ assertNull(result3);
+ assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
+ assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
+ assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
+ assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
+
+ // now add in the previous legacy shadow cell for that row
+ put = new Put(row2);
+ put.add(family,
+ addLegacyShadowCellSuffix(qualifier),
+ t2.getStartTimestamp(),
+ Bytes.toBytes(t2.getCommitTimestamp()));
+ htable.put(put);
+
+ // we should NOT be able to read that row now, even though
+ // it has a legacy shadow cell
+ Transaction t6 = tm.begin();
+ get = new Get(row2);
+ get.addColumn(family, qualifier);
+
+ getResult = table.get(t6, get);
+ assertFalse(getResult.containsColumn(family, qualifier), "Should NOT have column");
+
+ Transaction t7 = tm.begin();
+ s = new Scan();
+ scanner = table.getScanner(t7, s);
+ result1 = scanner.next();
+ result2 = scanner.next();
+ result3 = scanner.next();
+ scanner.close();
+
+ assertNull(result3, "There should only be 2 rows");
+ assertTrue(Arrays.equals(result1.getRow(), row1), "Should have first row");
+ assertTrue(Arrays.equals(result2.getRow(), row3), "Should have third row");
+ assertTrue(result1.containsColumn(family, qualifier), "Should have column family");
+ assertTrue(result2.containsColumn(family, qualifier), "Should have column family");
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private static final byte[] LEGACY_SHADOW_CELL_SUFFIX = ":OMID_CTS".getBytes(Charsets.UTF_8);
+
+ private static byte[] addLegacyShadowCellSuffix(byte[] qualifier) {
+ return com.google.common.primitives.Bytes.concat(qualifier, LEGACY_SHADOW_CELL_SUFFIX);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestSingleColumnFamily.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestSingleColumnFamily.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestSingleColumnFamily.java
new file mode 100644
index 0000000..c76085f
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestSingleColumnFamily.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestSingleColumnFamily extends OmidTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestSingleColumnFamily.class);
+
+
+ @Test
+ public void testSingleColumnFamily(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table1 = new TTable(hbaseConf, TEST_TABLE);
+ int num = 10;
+ Transaction t = tm.begin();
+ for (int j = 0; j < num; j++) {
+ byte[] data = Bytes.toBytes(j);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1"), data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2"), data);
+ table1.put(t, put);
+ }
+ //tm.tryCommit(t);
+ //t=tm.beginTransaction(); //Visible if in a different transaction
+ Scan s = new Scan();
+ ResultScanner res = table1.getScanner(t, s);
+ Result rr;
+ int count = 0;
+ while ((rr = res.next()) != null) {
+ int tmp1 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1")));
+ int tmp2 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2")));
+ LOG.info("RES:" + tmp1 + ";" + tmp2);
+ count++;
+ }
+ assertTrue("Can't see puts. I should see "
+ + num + " but I see " + count
+ , num == count);
+
+ tm.commit(t);
+ t = tm.begin();
+
+ for (int j = 0; j < num / 2; j++) {
+ byte[] data = Bytes.toBytes(j);
+ byte[] ndata = Bytes.toBytes(j * 10);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2"), ndata);
+ table1.put(t, put);
+ }
+ tm.commit(t);
+ t = tm.begin();
+ s = new Scan();
+ res = table1.getScanner(t, s);
+ count = 0;
+ int modified = 0, notmodified = 0;
+ while ((rr = res.next()) != null) {
+ int tmp1 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value1")));
+ int tmp2 = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes("value2")));
+ LOG.info("RES:" + tmp1 + ";" + tmp2);
+ count++;
+
+ if (tmp2 == Bytes.toInt(rr.getRow()) * 10) {
+ modified++;
+ } else {
+ notmodified++;
+ }
+ if (count == 8) {
+ LOG.debug("stop");
+ }
+ }
+ assertTrue("Can't see puts. I should see "
+ + num + " but I see " + count
+ , num == count);
+ assertTrue("Half of rows should equal row id, half not ("
+ + modified + ", " + notmodified + ")"
+ , modified == notmodified && notmodified == (num / 2));
+
+ tm.commit(t);
+ LOG.info("End commiting");
+ table1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
new file mode 100644
index 0000000..601c6ca
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.hbase.HBaseCommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
+import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.apache.omid.tso.DisruptorModule;
+import org.apache.omid.tso.MockPanicker;
+import org.apache.omid.tso.NetworkInterfaceUtils;
+import org.apache.omid.tso.Panicker;
+import org.apache.omid.tso.PausableTimestampOracle;
+import org.apache.omid.tso.TSOChannelHandler;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.TSOStateManager;
+import org.apache.omid.tso.TSOStateManagerImpl;
+import org.apache.omid.tso.TimestampOracle;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+import static org.apache.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
+
+class TestTSOModule extends AbstractModule {
+
+ private final Configuration hBaseConfig;
+ private final TSOServerConfig config;
+
+ TestTSOModule(Configuration hBaseConfig, TSOServerConfig config) {
+ this.hBaseConfig = hBaseConfig;
+ this.config = config;
+ }
+
+ @Override
+ protected void configure() {
+
+ bind(TSOChannelHandler.class).in(Singleton.class);
+
+ bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class);
+
+ bind(CommitTable.class).to(HBaseCommitTable.class).in(Singleton.class);
+ bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
+ bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
+ bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+
+ // Disruptor setup
+ install(new DisruptorModule());
+
+ // LeaseManagement setup
+ install(config.getLeaseModule());
+ }
+
+ @Provides
+ Configuration provideHBaseConfig() {
+ return hBaseConfig;
+ }
+
+ @Provides
+ TSOServerConfig provideTSOServerConfig() {
+ return config;
+ }
+
+ @Provides
+ @Singleton
+ MetricsRegistry provideMetricsRegistry() {
+ return new NullMetricsProvider();
+ }
+
+ @Provides
+ @Named(TSO_HOST_AND_PORT_KEY)
+ String provideTSOHostAndPort() throws SocketException, UnknownHostException {
+ return NetworkInterfaceUtils.getTSOHostAndPort(config);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
new file mode 100644
index 0000000..9906e4b
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTTableBehaviour.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Charsets;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.fail;
+
+@Test(groups = "noHBase")
+public class TestTTableBehaviour {
+
+ private byte[] row = Bytes.toBytes("1row");
+ private byte[] famName = Bytes.toBytes("tf");
+ private byte[] colName = Bytes.toBytes("tc");
+ private byte[] dataValue = Bytes.toBytes("test-data");
+
+ @Test(timeOut = 10_000)
+ public void testUserOperationsDontAllowTimestampSpecification() throws Exception {
+
+ // Component under test
+ TTable tt = new TTable(Mockito.mock(HTableInterface.class), Mockito.mock(HTableInterface.class));
+
+ long randomTimestampValue = Bytes.toLong("deadbeef".getBytes());
+
+ Transaction tx = Mockito.mock(Transaction.class);
+
+ // Test put fails when a timestamp is specified in the put
+ Put put = new Put(row, randomTimestampValue);
+ put.add(famName, colName, dataValue);
+ try {
+ tt.put(tx, put);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ // Test put fails when a timestamp is specified in a qualifier
+ put = new Put(row);
+ put.add(famName, colName, randomTimestampValue, dataValue);
+ try {
+ tt.put(tx, put);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ // Test that get fails when a timestamp is specified
+ Get get = new Get(row);
+ get.setTimeStamp(randomTimestampValue);
+ try {
+ tt.get(tx, get);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ // Test scan fails when a timerange is specified
+ Scan scan = new Scan(get);
+ try {
+ tt.getScanner(tx, scan);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ // Test delete fails when a timestamp is specified
+ Delete delete = new Delete(row);
+ delete.setTimestamp(randomTimestampValue);
+ try {
+ tt.delete(tx, delete);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ // Test delete fails when a timestamp is specified in a qualifier
+ delete = new Delete(row);
+ delete.deleteColumn(famName, colName, randomTimestampValue);
+ try {
+ tt.delete(tx, delete);
+ fail("Should have thrown an IllegalArgumentException due to timestamp specification");
+ } catch (IllegalArgumentException e) {
+ // Continue
+ }
+
+ }
+
+ /**
+ * Test that we cannot use reserved names for shadow cell identifiers as qualifiers in user operations
+ */
+ @Test(timeOut = 10_000)
+ public void testReservedNamesForShadowCellsCanNotBeUsedAsQualifiersInUserOperations() throws Exception {
+ byte[] nonValidQualifier1 = "blahblah\u0080".getBytes(Charsets.UTF_8);
+ byte[] validQualifierIncludingOldShadowCellSuffix = "blahblah:OMID_CTS".getBytes(Charsets.UTF_8);
+
+ TTable table = new TTable(Mockito.mock(HTableInterface.class), Mockito.mock(HTableInterface.class));
+
+ HBaseTransaction t1 = Mockito.mock(HBaseTransaction.class);
+ Put put = new Put(row);
+ put.add(famName, nonValidQualifier1, dataValue);
+ try {
+ table.put(t1, put);
+ fail("Shouldn't be able to put this");
+ } catch (IllegalArgumentException iae) {
+ // correct
+ }
+ Delete del = new Delete(row);
+ del.deleteColumn(famName, nonValidQualifier1);
+ try {
+ table.delete(t1, del);
+ fail("Shouldn't be able to delete this");
+ } catch (IllegalArgumentException iae) {
+ // correct
+ }
+
+ put = new Put(row);
+ put.add(famName, validQualifierIncludingOldShadowCellSuffix, dataValue);
+ try {
+ table.put(t1, put);
+ } catch (IllegalArgumentException iae) {
+ fail("Qualifier shouldn't be rejected anymore");
+ }
+ del = new Delete(row);
+ del.deleteColumn(famName, validQualifierIncludingOldShadowCellSuffix);
+ try {
+ table.delete(t1, del);
+ } catch (IllegalArgumentException iae) {
+ fail("Qualifier shouldn't be rejected anymore");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java
new file mode 100644
index 0000000..6b66710
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionCleanup.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.omid.tso.client.AbortException;
+import org.apache.omid.tso.client.ForwardingTSOFuture;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.anySetOf;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+
+@Test(groups = "sharedHBase")
+public class TestTransactionCleanup extends OmidTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTransactionCleanup.class);
+
+ private static final long START_TS = 1L;
+
+ private byte[] row = Bytes.toBytes("row");
+ private byte[] family = Bytes.toBytes(TEST_FAMILY);
+ private byte[] qual = Bytes.toBytes("qual");
+ private byte[] data = Bytes.toBytes("data");
+
+ // NOTE: This test is maybe redundant with runTestCleanupAfterConflict()
+ // and testCleanupWithDeleteRow() tests in TestTransactionCleanup class.
+ // Code in TestTransactionCleanup is a little more difficult to follow,
+ // lacks some assertions and includes some magic numbers, so we should
+ // try to review and improve the tests in these two classes in a further
+ // commit.
+ @Test
+ public void testTransactionIsCleanedUpAfterBeingAborted(ITestContext context) throws Exception {
+
+ final int ROWS_MODIFIED = 1;
+
+ // Prepare the mocking results
+ SettableFuture<Long> startTSF = SettableFuture.create();
+ startTSF.set(START_TS);
+ ForwardingTSOFuture<Long> stFF = new ForwardingTSOFuture<>(startTSF);
+
+ SettableFuture<Long> abortingF = SettableFuture.create();
+ abortingF.setException(new AbortException());
+ ForwardingTSOFuture<Long> abortingFF = new ForwardingTSOFuture<>(abortingF);
+
+ // Mock the TSO Client setting the right method responses
+ TSOClient mockedTSOClient = mock(TSOClient.class);
+
+ doReturn(stFF)
+ .when(mockedTSOClient).getNewStartTimestamp();
+
+ doReturn(abortingFF)
+ .when(mockedTSOClient).commit(eq(START_TS), anySetOf(HBaseCellId.class));
+
+ try (TransactionManager tm = newTransactionManager(context, mockedTSOClient);
+ TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+
+ // Start a transaction and put some data in a column
+ Transaction tx = tm.begin();
+
+ Put put = new Put(row);
+ put.add(family, qual, data);
+ txTable.put(tx, put);
+
+ // Abort transaction when committing, so the cleanup
+ // process we want to test is triggered
+ try {
+ tm.commit(tx);
+ } catch (RollbackException e) {
+ // Expected
+ }
+
+ // So now we have to check that the Delete marker introduced by the
+ // cleanup process is there
+ Scan scan = new Scan(row);
+ scan.setRaw(true); // Raw scan to obtain the deleted cells
+ ResultScanner resultScanner = txTable.getHTable().getScanner(scan);
+ int resultCount = 0;
+ for (Result result : resultScanner) {
+ assertEquals(2, result.size()); // Size == 2, including the put and delete from cleanup
+ LOG.trace("Result {}", result);
+ // The last element of the qualifier should have the Delete marker
+ byte encodedType = result.getColumnLatestCell(family, qual).getTypeByte();
+ assertEquals(KeyValue.Type.Delete,
+ KeyValue.Type.codeToType(encodedType));
+ resultCount++;
+ }
+ assertEquals(ROWS_MODIFIED, resultCount);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
new file mode 100644
index 0000000..229acee
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestTransactionConflict extends OmidTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTransactionConflict.class);
+
+
+ @Test
+ public void runTestWriteWriteConflict(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction t1 = tm.begin();
+ LOG.info("Transaction created " + t1);
+
+ Transaction t2 = tm.begin();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
+
+ tm.commit(t2);
+
+ try {
+ tm.commit(t1);
+ Assert.fail("Transaction should not commit successfully");
+ } catch (RollbackException e) {
+ }
+ }
+
+ @Test
+ public void runTestMultiTableConflict(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ String table2 = TEST_TABLE + 2;
+ TableName table2Name = TableName.valueOf(table2);
+
+ HBaseAdmin admin = new HBaseAdmin(hbaseConf);
+
+ if (!admin.tableExists(table2)) {
+ HTableDescriptor desc = new HTableDescriptor(table2Name);
+ HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+ datafam.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(datafam);
+
+ admin.createTable(desc);
+ }
+
+ if (admin.isTableDisabled(table2)) {
+ admin.enableTable(table2);
+ }
+ admin.close();
+
+ TTable tt2 = new TTable(hbaseConf, table2);
+
+ Transaction t1 = tm.begin();
+ LOG.info("Transaction created " + t1);
+
+ Transaction t2 = tm.begin();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] row2 = Bytes.toBytes("test-simple2");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+ tt2.put(t1, p);
+
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
+ p2 = new Put(row2);
+ p2.add(fam, col, data2);
+ tt2.put(t2, p2);
+
+ tm.commit(t2);
+
+ boolean aborted = false;
+ try {
+ tm.commit(t1);
+ assertTrue("Transaction commited successfully", false);
+ } catch (RollbackException e) {
+ aborted = true;
+ }
+ assertTrue("Transaction didn't raise exception", aborted);
+
+ ResultScanner rs = tt2.getHTable().getScanner(fam, col);
+
+ int count = 0;
+ Result r;
+ while ((r = rs.next()) != null) {
+ count += r.size();
+ }
+ assertEquals("Should have cell", 1, count);
+ }
+
+ @Test
+ public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction t1 = tm.begin();
+ LOG.info("Transaction created " + t1);
+
+ Transaction t2 = tm.begin();
+ LOG.info("Transaction created" + t2);
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+
+ Get g = new Get(row).setMaxVersions();
+ g.addColumn(fam, col);
+ Result r = tt.getHTable().get(g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data1, r.getValue(fam, col)));
+
+ Put p2 = new Put(row);
+ p2.add(fam, col, data2);
+ tt.put(t2, p2);
+
+ r = tt.getHTable().get(g);
+ assertEquals("Unexpected size for read.", 2, r.size());
+ r = tt.get(t2, g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data2, r.getValue(fam, col)));
+
+ tm.commit(t1);
+
+ boolean aborted = false;
+ try {
+ tm.commit(t2);
+ assertTrue("Transaction commited successfully", false);
+ } catch (RollbackException e) {
+ aborted = true;
+ }
+ assertTrue("Transaction didn't raise exception", aborted);
+
+ r = tt.getHTable().get(g);
+ assertEquals("Unexpected size for read.", 1, r.size());
+ assertTrue("Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)),
+ Bytes.equals(data1, r.getValue(fam, col)));
+ }
+
+ @Test
+ public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
+ try {
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction t1 = tm.begin();
+ LOG.info("Transaction created " + t1);
+
+ int rowcount = 10;
+ int count = 0;
+
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes.toBytes("testdata");
+ byte[] data1 = Bytes.toBytes("testWrite-1");
+ byte[] data2 = Bytes.toBytes("testWrite-2");
+
+ byte[] modrow = Bytes.toBytes("test-del" + 3);
+ for (int i = 0; i < rowcount; i++) {
+ byte[] row = Bytes.toBytes("test-del" + i);
+
+ Put p = new Put(row);
+ p.add(fam, col, data1);
+ tt.put(t1, p);
+ }
+ tm.commit(t1);
+
+ Transaction t2 = tm.begin();
+ LOG.info("Transaction created " + t2);
+ Delete d = new Delete(modrow);
+ tt.delete(t2, d);
+
+ ResultScanner rs = tt.getScanner(t2, new Scan());
+ Result r = rs.next();
+ count = 0;
+ while (r != null) {
+ count++;
+ LOG.trace("row: " + Bytes.toString(r.getRow()) + " count: " + count);
+ r = rs.next();
+ }
+ assertEquals("Wrong count", rowcount - 1, count);
+
+ Transaction t3 = tm.begin();
+ LOG.info("Transaction created " + t3);
+ Put p = new Put(modrow);
+ p.add(fam, col, data2);
+ tt.put(t3, p);
+
+ tm.commit(t3);
+
+ boolean aborted = false;
+ try {
+ tm.commit(t2);
+ assertTrue("Didn't abort", false);
+ } catch (RollbackException e) {
+ aborted = true;
+ }
+ assertTrue("Didn't raise exception", aborted);
+
+ Transaction tscan = tm.begin();
+ rs = tt.getScanner(tscan, new Scan());
+ r = rs.next();
+ count = 0;
+ while (r != null) {
+ count++;
+ r = rs.next();
+ }
+ assertEquals("Wrong count", rowcount, count);
+
+ } catch (Exception e) {
+ LOG.error("Exception occurred", e);
+ throw e;
+ }
+ }
+
+ @Test
+ public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
+ TransactionManager tm = newTransactionManager(context);
+ TTable tt = new TTable(hbaseConf, TEST_TABLE);
+
+ Transaction t1 = tm.begin();
+ Transaction t2 = tm.begin();
+ LOG.info("Transactions created " + t1 + " " + t2);
+
+ byte[] row = Bytes.toBytes("row");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col1 = Bytes.toBytes("testdata1");
+ byte[] col2 = Bytes.toBytes("testdata2");
+ byte[] data = Bytes.toBytes("testWrite-1");
+
+ Put p2 = new Put(row);
+ p2.add(fam, col1, data);
+ tt.put(t2, p2);
+ tm.commit(t2);
+
+ Put p1 = new Put(row);
+ p1.add(fam, col2, data);
+ tt.put(t1, p1);
+ tm.commit(t1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
new file mode 100644
index 0000000..13428cd
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.Transaction.Status;
+import org.apache.omid.tso.ProgrammableTSOServer;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@Test(groups = "sharedHBase")
+public class TestTxMgrFailover extends OmidTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTxMgrFailover.class);
+
+ private static final int TSO_SERVER_PORT = 3333;
+ private static final String TSO_SERVER_HOST = "localhost";
+
+ private static final long TX1_ST = 1L;
+ private static final long TX1_CT = 2L;
+
+ private static final byte[] qualifier = Bytes.toBytes("test-qual");
+ private static final byte[] row1 = Bytes.toBytes("row1");
+ private static final byte[] data1 = Bytes.toBytes("testWrite-1");
+
+ // Used in test assertions
+ private InMemoryCommitTable commitTable;
+
+ private CommitTable.Client commitTableClient;
+ private CommitTable.Writer commitTableWriter;
+
+ // Allows to prepare the required responses to client request operations
+ private ProgrammableTSOServer tso;
+
+ // The transaction manager under test
+ private HBaseTransactionManager tm;
+
+ @BeforeClass(alwaysRun = true)
+ public void beforeClass() throws Exception {
+ // ------------------------------------------------------------------------------------------------------------
+ // ProgrammableTSOServer setup
+ // ------------------------------------------------------------------------------------------------------------
+ tso = new ProgrammableTSOServer(TSO_SERVER_PORT);
+ TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+ }
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void beforeMethod() throws IOException, InterruptedException {
+
+ commitTable = new InMemoryCommitTable(); // Use an in-memory commit table to speed up tests
+ commitTableClient = spy(commitTable.getClient());
+ commitTableWriter = spy(commitTable.getWriter());
+
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ TSOClient tsoClientForTM = spy(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
+
+ tm = spy(HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .tsoClient(tsoClientForTM)
+ .commitTableClient(commitTableClient)
+ .build());
+ }
+
+ @Test(timeOut = 30_000)
+ public void testAbortResponseFromTSOThrowsRollbackExceptionInClient() throws Exception {
+ // Program the TSO to return an ad-hoc Timestamp and an abort response for tx 1
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.AbortResponse(TX1_ST));
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ assertEquals(tx1.getStartTimestamp(), TX1_ST);
+ Put put = new Put(row1);
+ put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ txTable.put(tx1, put);
+ assertEquals(hBaseUtils.countRows(new HTable(hbaseConf, TEST_TABLE)), 1, "Rows should be 1!");
+ checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ qualifier);
+
+ try {
+ tm.commit(tx1);
+ fail();
+ } catch (RollbackException e) {
+ // Expected!
+
+ }
+
+ // Check transaction status
+ assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
+ assertEquals(tx1.getCommitTimestamp(), 0);
+ // Check the cleanup process did its job and the committed data is NOT there
+ checkOperationSuccessOnCell(KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ qualifier);
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testClientReceivesSuccessfulCommitForNonInvalidatedTxCommittedByPreviousTSO() throws Exception {
+
+ // Program the TSO to return an ad-hoc Timestamp and an commit response with heuristic actions
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
+ // Simulate that tx1 was committed by writing to commit table
+ commitTableWriter.addCommittedTransaction(TX1_ST, TX1_CT);
+ commitTableWriter.flush();
+ assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ assertEquals(tx1.getStartTimestamp(), TX1_ST);
+ Put put = new Put(row1);
+ put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ txTable.put(tx1, put);
+ // Should succeed
+ tm.commit(tx1);
+
+ // Check transaction status
+ assertEquals(tx1.getStatus(), Status.COMMITTED);
+ assertEquals(tx1.getCommitTimestamp(), TX1_CT);
+ // Check the cleanup process did its job and the committed data is there
+ // Note that now we do not clean up the commit table when exercising the heuristic actions
+ assertEquals(commitTable.countElements(), 1,
+ "Rows should be 1! We don't have to clean CT in this case");
+ Optional<CommitTimestamp>
+ optionalCT =
+ tm.commitTableClient.getCommitTimestamp(TX1_ST).get();
+ assertTrue(optionalCT.isPresent());
+ checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ qualifier);
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testClientReceivesRollbackExceptionForInvalidatedTxCommittedByPreviousTSO() throws Exception {
+
+ // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
+ // Simulate that tx1 was committed by writing to commit table but was later invalidated
+ commitTableClient.tryInvalidateTransaction(TX1_ST);
+ assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
+
+ executeTxAndCheckRollback();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testClientReceivesNotificationOfANewTSOCanInvalidateTransaction() throws Exception {
+
+ // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
+
+ assertEquals(commitTable.countElements(), 0, "Rows should be 0!");
+
+ executeTxAndCheckRollback();
+
+ }
+
+ private void executeTxAndCheckRollback() throws IOException, TransactionException, InterruptedException, java.util.concurrent.ExecutionException {
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ assertEquals(tx1.getStartTimestamp(), TX1_ST);
+ Put put = new Put(row1);
+ put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ txTable.put(tx1, put);
+ try {
+ tm.commit(tx1);
+ fail();
+ } catch (RollbackException e) {
+ // Expected
+ }
+
+ // Check transaction status
+ assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
+ assertEquals(tx1.getCommitTimestamp(), 0);
+ // Check the cleanup process did its job and the uncommitted data is NOT there
+ assertEquals(commitTable.countElements(), 1, "Rows should be 1! Dirty data should be there");
+ Optional<CommitTimestamp>
+ optionalCT =
+ tm.commitTableClient.getCommitTimestamp(TX1_ST).get();
+ assertTrue(optionalCT.isPresent());
+ assertFalse(optionalCT.get().isValid());
+ checkOperationSuccessOnCell(KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ qualifier);
+ }
+ }
+
+ @Test(timeOut = 30_000)
+ public void testClientSuccessfullyCommitsWhenReceivingNotificationOfANewTSOAandCANTInvalidateTransaction()
+ throws Exception {
+
+ // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
+
+ // Simulate that the original TSO was able to add the tx to commit table in the meantime
+ commitTableWriter.addCommittedTransaction(TX1_ST, TX1_CT);
+ commitTableWriter.flush();
+ assertEquals(commitTable.countElements(), 1, "Rows should be 1!");
+ SettableFuture<Optional<CommitTimestamp>> f1 = SettableFuture.create();
+ f1.set(Optional.<CommitTimestamp>absent());
+ SettableFuture<Optional<CommitTimestamp>> f2 = SettableFuture.create();
+ f2.set(Optional.of(new CommitTimestamp(COMMIT_TABLE, TX1_CT, true)));
+ doReturn(f1).doReturn(f2).when(commitTableClient).getCommitTimestamp(TX1_ST);
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ assertEquals(tx1.getStartTimestamp(), TX1_ST);
+ Put put = new Put(row1);
+ put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ txTable.put(tx1, put);
+
+ tm.commit(tx1);
+
+ // Check transaction status
+ assertEquals(tx1.getStatus(), Status.COMMITTED);
+ assertEquals(tx1.getCommitTimestamp(), TX1_CT);
+ // Check the cleanup process did its job and the committed data is there
+ // Note that now we do not clean up the commit table when exercising the heuristic actions
+ assertEquals(commitTable.countElements(), 1,
+ "Rows should be 1! We don't have to clean CT in this case");
+ checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ qualifier);
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testClientReceivesATransactionExceptionWhenReceivingNotificationOfANewTSOAndCANTInvalidateTransactionAndCTCheckIsUnsuccessful()
+ throws Exception {
+
+ // Program the TSO to return an ad-hoc Timestamp and a commit response with heuristic actions
+ tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
+ tso.queueResponse(new ProgrammableTSOServer.CommitResponse(true, TX1_ST, TX1_CT));
+
+ // Simulate that the original TSO was able to add the tx to commit table in the meantime
+ SettableFuture<Boolean> f = SettableFuture.create();
+ f.set(false);
+ doReturn(f).when(commitTableClient).tryInvalidateTransaction(TX1_ST);
+
+ assertEquals(commitTable.countElements(), 0, "Rows should be 0!");
+
+ try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ assertEquals(tx1.getStartTimestamp(), TX1_ST);
+ Put put = new Put(row1);
+ put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ txTable.put(tx1, put);
+ try {
+ tm.commit(tx1);
+ fail();
+ } catch (TransactionException e) {
+ // Expected but is not good because we're not able to determine the tx outcome
+ }
+
+ // Check transaction status
+ assertEquals(tx1.getStatus(), Status.RUNNING);
+ assertEquals(tx1.getCommitTimestamp(), 0);
+ }
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ protected void checkOperationSuccessOnCell(KeyValue.Type targetOp,
+ @Nullable byte[] expectedValue,
+ byte[] tableName,
+ byte[] row,
+ byte[] fam,
+ byte[] col) {
+
+ try (HTable table = new HTable(hbaseConf, tableName)) {
+ Get get = new Get(row).setMaxVersions(1);
+ Result result = table.get(get);
+ Cell latestCell = result.getColumnLatestCell(fam, col);
+
+ switch (targetOp) {
+ case Put:
+ assertEquals(latestCell.getTypeByte(), targetOp.getCode());
+ assertEquals(CellUtil.cloneValue(latestCell), expectedValue);
+ LOG.trace("Value for " + Bytes.toString(tableName) + ":"
+ + Bytes.toString(row) + ":" + Bytes.toString(fam) + ":"
+ + Bytes.toString(col) + "=>" + Bytes.toString(CellUtil.cloneValue(latestCell))
+ + " (" + Bytes.toString(expectedValue) + " expected)");
+ break;
+ case Delete:
+ LOG.trace("Value for " + Bytes.toString(tableName) + ":"
+ + Bytes.toString(row) + ":" + Bytes.toString(fam)
+ + Bytes.toString(col) + " deleted");
+ assertNull(latestCell);
+ break;
+ default:
+ fail();
+ }
+ } catch (IOException e) {
+ LOG.error("Error reading row " + Bytes.toString(tableName) + ":"
+ + Bytes.toString(row) + ":" + Bytes.toString(fam)
+ + Bytes.toString(col), e);
+ fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
new file mode 100644
index 0000000..342f349
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+@Test(groups = "sharedHBase")
+public class TestUpdateScan extends OmidTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class);
+
+ private static final String TEST_COL = "value";
+ private static final String TEST_COL_2 = "col_2";
+
+ @Test
+ public void testGet(ITestContext context) throws Exception {
+ try {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+ Transaction t = tm.begin();
+ int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
+ for (int i = 0; i < lInts.length; i++) {
+ byte[] data = Bytes.toBytes(lInts[i]);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ table.put(t, put);
+ }
+ int startKeyValue = lInts[3];
+ int stopKeyValue = lInts[3];
+ byte[] startKey = Bytes.toBytes(startKeyValue);
+ byte[] stopKey = Bytes.toBytes(stopKeyValue);
+ Get g = new Get(startKey);
+ Result r = table.get(t, g);
+ if (!r.isEmpty()) {
+ int tmp = Bytes.toInt(r.getValue(Bytes.toBytes(TEST_FAMILY),
+ Bytes.toBytes(TEST_COL)));
+ LOG.info("Result:" + tmp);
+ assertTrue("Bad value, should be "
+ + startKeyValue + " but is " + tmp
+ , tmp == startKeyValue);
+ } else {
+ Assert.fail("Bad result");
+ }
+ tm.commit(t);
+
+ Scan s = new Scan(startKey);
+ CompareFilter.CompareOp op = CompareFilter.CompareOp.LESS_OR_EQUAL;
+ RowFilter toFilter = new RowFilter(op, new BinaryPrefixComparator(stopKey));
+ boolean startInclusive = true;
+ if (!startInclusive) {
+ FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ filters.addFilter(new RowFilter(CompareFilter.CompareOp.GREATER,
+ new BinaryPrefixComparator(startKey)));
+ filters.addFilter(new WhileMatchFilter(toFilter));
+ s.setFilter(filters);
+ } else {
+ s.setFilter(new WhileMatchFilter(toFilter));
+ }
+ t = tm.begin();
+ ResultScanner res = table.getScanner(t, s);
+ Result rr;
+ int count = 0;
+ while ((rr = res.next()) != null) {
+ int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
+ Bytes.toBytes(TEST_COL)));
+ LOG.info("Result: " + iTmp);
+ count++;
+ }
+ assertEquals("Count is wrong", 1, count);
+ LOG.info("Rows found " + count);
+ tm.commit(t);
+ table.close();
+ } catch (Exception e) {
+ LOG.error("Exception in test", e);
+ }
+ }
+
+ @Test
+ public void testScan(ITestContext context) throws Exception {
+
+ try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ TransactionManager tm = newTransactionManager(context);
+ Transaction t = tm.begin();
+ int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
+ for (int lInt : lInts) {
+ byte[] data = Bytes.toBytes(lInt);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
+ table.put(t, put);
+ }
+
+ Scan s = new Scan();
+ // Adding two columns to the scanner should not throw a
+ // ConcurrentModificationException when getting the scanner
+ s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL));
+ s.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2));
+ ResultScanner res = table.getScanner(t, s);
+ Result rr;
+ int count = 0;
+ while ((rr = res.next()) != null) {
+ int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
+ Bytes.toBytes(TEST_COL)));
+ LOG.info("Result: " + iTmp);
+ count++;
+ }
+ assertTrue("Count should be " + lInts.length + " but is " + count,
+ count == lInts.length);
+ LOG.info("Rows found " + count);
+
+ tm.commit(t);
+
+ t = tm.begin();
+ res = table.getScanner(t, s);
+ count = 0;
+ while ((rr = res.next()) != null) {
+ int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
+ Bytes.toBytes(TEST_COL)));
+ LOG.info("Result: " + iTmp);
+ count++;
+ }
+ assertTrue("Count should be " + lInts.length + " but is " + count,
+ count == lInts.length);
+ LOG.info("Rows found " + count);
+ tm.commit(t);
+ }
+
+ }
+
+
+ @Test
+ public void testScanUncommitted(ITestContext context) throws Exception {
+ try {
+ TransactionManager tm = newTransactionManager(context);
+ TTable table = new TTable(hbaseConf, TEST_TABLE);
+ Transaction t = tm.begin();
+ int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
+ for (int aLIntsA : lIntsA) {
+ byte[] data = Bytes.toBytes(aLIntsA);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ table.put(t, put);
+ }
+ tm.commit(t);
+
+ Transaction tu = tm.begin();
+ int[] lIntsB = new int[]{105, 24, 4342, 32, 7, 3, 30, 40};
+ for (int aLIntsB : lIntsB) {
+ byte[] data = Bytes.toBytes(aLIntsB);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ table.put(tu, put);
+ }
+
+ t = tm.begin();
+ int[] lIntsC = new int[]{109, 224, 242, 2, 16, 59, 23, 26};
+ for (int aLIntsC : lIntsC) {
+ byte[] data = Bytes.toBytes(aLIntsC);
+ Put put = new Put(data);
+ put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ table.put(t, put);
+ }
+ tm.commit(t);
+
+ t = tm.begin();
+ Scan s = new Scan();
+ ResultScanner res = table.getScanner(t, s);
+ Result rr;
+ int count = 0;
+
+ while ((rr = res.next()) != null) {
+ int iTmp = Bytes.toInt(rr.getValue(Bytes.toBytes(TEST_FAMILY),
+ Bytes.toBytes(TEST_COL)));
+ LOG.info("Result: " + iTmp);
+ count++;
+ }
+ assertTrue("Count should be " + (lIntsA.length * lIntsC.length) + " but is " + count,
+ count == lIntsA.length + lIntsC.length);
+ LOG.info("Rows found " + count);
+ tm.commit(t);
+ table.close();
+ } catch (Exception e) {
+ LOG.error("Exception in test", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/resources/log4j.properties b/hbase-client/src/test/resources/log4j.properties
index 90954b7..6380e8a 100644
--- a/hbase-client/src/test/resources/log4j.properties
+++ b/hbase-client/src/test/resources/log4j.properties
@@ -36,13 +36,12 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss,SSS} [%t] %
log4j.logger.org.apache.zookeeper=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.hadoop.ipc=ERROR
-
-log4j.logger.com.yahoo.omid=INFO
-#log4j.logger.com.yahoo.omid.regionserver.TransactionalRegionServer=TRACE
-#log4j.logger.com.yahoo.omid.TestBasicTransaction=TRACE
-#log4j.logger.com.yahoo.omid.client.TSOClient=TRACE
-#log4j.logger.com.yahoo.omid.client.TransactionState=TRACE
-#log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=TRACE
+log4j.logger.org.apache.omid=INFO
+#log4j.logger.org.apache.omid.regionserver.TransactionalRegionServer=TRACE
+#log4j.logger.org.apache.omid.TestBasicTransaction=TRACE
+#log4j.logger.org.apache.omid.client.TSOClient=TRACE
+#log4j.logger.org.apache.omid.client.TransactionState=TRACE
+#log4j.logger.org.apache.omid.tso.ThroughputMonitor=TRACE
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-client/src/test/resources/test-hbase-omid-client-config.yml
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/resources/test-hbase-omid-client-config.yml b/hbase-client/src/test/resources/test-hbase-omid-client-config.yml
index 9a95df3..8b434da 100644
--- a/hbase-client/src/test/resources/test-hbase-omid-client-config.yml
+++ b/hbase-client/src/test/resources/test-hbase-omid-client-config.yml
@@ -2,10 +2,10 @@
commitTableName: OMID_COMMIT_TABLE1
#TSO/HA connection
-omidClientConfiguration: !!com.yahoo.omid.tso.client.OmidClientConfiguration
+omidClientConfiguration: !!org.apache.omid.tso.client.OmidClientConfiguration
#TSO/HA connection
connectionString: "somehost:54758"
- connectionType: !!com.yahoo.omid.tso.client.OmidClientConfiguration$ConnType HA
+ connectionType: !!org.apache.omid.tso.client.OmidClientConfiguration$ConnType HA
zkConnectionTimeoutInSecs: 11
#TSO related
@@ -16,18 +16,18 @@ omidClientConfiguration: !!com.yahoo.omid.tso.client.OmidClientConfiguration
executorThreads: 4
#Instrumentation
-metrics: !!com.yahoo.omid.metrics.CodahaleMetricsProvider [
- !!com.yahoo.omid.metrics.CodahaleMetricsConfig {
+metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+ !!org.apache.omid.metrics.CodahaleMetricsConfig {
outputFreqInSecs: 1,
reporters: !!set {
- !!com.yahoo.omid.metrics.CodahaleMetricsConfig$Reporter CSV,
- !!com.yahoo.omid.metrics.CodahaleMetricsConfig$Reporter SLF4J,
- !!com.yahoo.omid.metrics.CodahaleMetricsConfig$Reporter GRAPHITE,
- !!com.yahoo.omid.metrics.CodahaleMetricsConfig$Reporter CONSOLE
+ !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV,
+ !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter SLF4J,
+ !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter GRAPHITE,
+ !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CONSOLE
},
csvDir: "some/folder",
prefix: "somePrefix",
- slf4jLogger: "com.yahoo",
+ slf4jLogger: "org.apache",
graphiteHostConfig: "somehost:1234"
}
]
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/hbase-commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-commit-table/pom.xml b/hbase-commit-table/pom.xml
index 0ddfafd..82dc0f7 100644
--- a/hbase-commit-table/pom.xml
+++ b/hbase-commit-table/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
<version>0.8.1.37-SNAPSHOT</version>
</parent>
@@ -17,17 +17,17 @@
<!-- Dependencies on Omid modules -->
<dependency>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>commit-table</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>hbase-common</artifactId>
<version>${project.version}</version>
</dependency>
@@ -129,7 +129,7 @@
</activation>
<dependencies>
<dependency>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>hbase0-shims</artifactId>
<version>${project.version}</version>
</dependency>
@@ -140,7 +140,7 @@
<id>hbase-1</id>
<dependencies>
<dependency>
- <groupId>com.yahoo.omid</groupId>
+ <groupId>org.apache.omid</groupId>
<artifactId>hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>