You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/09/16 08:23:06 UTC
[2/4] incubator-omid git commit: OMID-107 Replace pre 1.0 deprecated
HBase APIs
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
index 21745c4..abdd602 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTransactionConflict.java
@@ -17,12 +17,18 @@
*/
package org.apache.omid.transaction;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -33,10 +39,6 @@ import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
@Test(groups = "sharedHBase")
public class TestTransactionConflict extends OmidTestBase {
@@ -45,7 +47,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestWriteWriteConflict(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -60,11 +62,11 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
tm.commit(t2);
@@ -79,27 +81,29 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestMultiTableConflict(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
String table2 = TEST_TABLE + 2;
TableName table2Name = TableName.valueOf(table2);
- HBaseAdmin admin = new HBaseAdmin(hbaseConf);
-
- if (!admin.tableExists(table2)) {
- HTableDescriptor desc = new HTableDescriptor(table2Name);
- HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
- datafam.setMaxVersions(Integer.MAX_VALUE);
- desc.addFamily(datafam);
-
- admin.createTable(desc);
- }
-
- if (admin.isTableDisabled(table2)) {
- admin.enableTable(table2);
+ try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
+ Admin admin = conn.getAdmin()) {
+ TableName htable2 = TableName.valueOf(table2);
+
+ if (!admin.tableExists(htable2)) {
+ HTableDescriptor desc = new HTableDescriptor(table2Name);
+ HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+ datafam.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(datafam);
+
+ admin.createTable(desc);
+ }
+
+ if (admin.isTableDisabled(htable2)) {
+ admin.enableTable(htable2);
+ }
}
- admin.close();
- TTable tt2 = new TTable(hbaseConf, table2);
+ TTable tt2 = new TTable(connection, table2);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -115,15 +119,15 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
tt2.put(t1, p);
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
p2 = new Put(row2);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt2.put(t2, p2);
tm.commit(t2);
@@ -150,7 +154,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestCleanupAfterConflict(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -165,7 +169,7 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
Get g = new Get(row).setMaxVersions();
@@ -176,7 +180,7 @@ public class TestTransactionConflict extends OmidTestBase {
"Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
r = tt.getHTable().get(g);
@@ -207,7 +211,7 @@ public class TestTransactionConflict extends OmidTestBase {
public void testCleanupWithDeleteRow(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -225,7 +229,7 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] row = Bytes.toBytes("test-del" + i);
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
}
tm.commit(t1);
@@ -248,7 +252,7 @@ public class TestTransactionConflict extends OmidTestBase {
Transaction t3 = tm.begin();
LOG.info("Transaction created " + t3);
Put p = new Put(modrow);
- p.add(fam, col, data2);
+ p.addColumn(fam, col, data2);
tt.put(t3, p);
tm.commit(t3);
@@ -277,7 +281,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void testMultipleCellChangesOnSameRow(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
Transaction t2 = tm.begin();
@@ -290,12 +294,12 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data = Bytes.toBytes("testWrite-1");
Put p2 = new Put(row);
- p2.add(fam, col1, data);
+ p2.addColumn(fam, col1, data);
tt.put(t2, p2);
tm.commit(t2);
Put p1 = new Put(row);
- p1.add(fam, col2, data);
+ p1.addColumn(fam, col2, data);
tt.put(t1, p1);
tm.commit(t1);
}
@@ -303,7 +307,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestWriteWriteConflictWithAdditionalConflictFreeWrites(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -318,21 +322,21 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
row = Bytes.toBytes("test-simple-cf");
p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.markPutAsConflictFreeMutation(p);
tt.put(t1, p);
p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.markPutAsConflictFreeMutation(p2);
tt.put(t2, p2);
@@ -348,7 +352,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestWriteWriteConflictFreeWrites(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -363,23 +367,23 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.markPutAsConflictFreeMutation(p);
tt.put(t1, p);
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.markPutAsConflictFreeMutation(p2);
tt.put(t2, p2);
row = Bytes.toBytes("test-simple-cf");
p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.markPutAsConflictFreeMutation(p);
tt.put(t1, p);
p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.markPutAsConflictFreeMutation(p2);
tt.put(t2, p2);
@@ -395,7 +399,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestWriteWriteConflictFreeWritesWithOtherWrites(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -411,21 +415,21 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
Put p2 = new Put(row1);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
row = Bytes.toBytes("test-simple-cf");
p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.markPutAsConflictFreeMutation(p);
tt.put(t1, p);
p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.markPutAsConflictFreeMutation(p2);
tt.put(t2, p2);
@@ -441,7 +445,7 @@ public class TestTransactionConflict extends OmidTestBase {
@Test(timeOut = 10_000)
public void runTestCleanupConflictFreeWritesAfterConflict(ITestContext context) throws Exception {
TransactionManager tm = newTransactionManager(context);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction t1 = tm.begin();
LOG.info("Transaction created " + t1);
@@ -457,7 +461,7 @@ public class TestTransactionConflict extends OmidTestBase {
byte[] data2 = Bytes.toBytes("testWrite-2");
Put p = new Put(row);
- p.add(fam, col, data1);
+ p.addColumn(fam, col, data1);
tt.put(t1, p);
Get g = new Get(row).setMaxVersions();
@@ -468,11 +472,11 @@ public class TestTransactionConflict extends OmidTestBase {
"Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
Put p2 = new Put(row);
- p2.add(fam, col, data2);
+ p2.addColumn(fam, col, data2);
tt.put(t2, p2);
Put p3 = new Put(row1);
- p3.add(fam, col, data2);
+ p3.addColumn(fam, col, data2);
tt.markPutAsConflictFreeMutation(p3);
tt.put(t2, p3);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
index d507c24..4dad9bb 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTxMgrFailover.java
@@ -17,13 +17,22 @@
*/
package org.apache.omid.transaction;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+
+import javax.annotation.Nullable;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
@@ -37,14 +46,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.fail;
-
@Test(groups = "sharedHBase")
public class TestTxMgrFailover extends OmidTestBase {
@@ -54,7 +55,6 @@ public class TestTxMgrFailover extends OmidTestBase {
private static final String TSO_SERVER_HOST = "localhost";
private static final long TX1_ST = 1L;
- private static final long TX1_CT = 2L;
private static final byte[] qualifier = Bytes.toBytes("test-qual");
private static final byte[] row1 = Bytes.toBytes("row1");
@@ -103,14 +103,14 @@ public class TestTxMgrFailover extends OmidTestBase {
tso.queueResponse(new ProgrammableTSOServer.TimestampResponse(TX1_ST));
tso.queueResponse(new ProgrammableTSOServer.AbortResponse(TX1_ST));
- try (TTable txTable = new TTable(hbaseConf, TEST_TABLE)) {
+ try (TTable txTable = new TTable(connection, TEST_TABLE)) {
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
assertEquals(tx1.getStartTimestamp(), TX1_ST);
Put put = new Put(row1);
- put.add(TEST_FAMILY.getBytes(), qualifier, data1);
+ put.addColumn(TEST_FAMILY.getBytes(), qualifier, data1);
txTable.put(tx1, put);
- assertEquals(hBaseUtils.countRows(new HTable(hbaseConf, TEST_TABLE)), 1, "Rows should be 1!");
- checkOperationSuccessOnCell(KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ assertEquals(hBaseUtils.countRows(txTable.getHTable()), 1, "Rows should be 1!");
+ checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Put, data1, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
qualifier);
try {
@@ -125,7 +125,7 @@ public class TestTxMgrFailover extends OmidTestBase {
assertEquals(tx1.getStatus(), Status.ROLLEDBACK);
assertEquals(tx1.getCommitTimestamp(), 0);
// Check the cleanup process did its job and the committed data is NOT there
- checkOperationSuccessOnCell(KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
+ checkOperationSuccessOnCell(txTable.getHTable(), KeyValue.Type.Delete, null, TEST_TABLE.getBytes(), row1, TEST_FAMILY.getBytes(),
qualifier);
}
@@ -135,14 +135,15 @@ public class TestTxMgrFailover extends OmidTestBase {
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
- protected void checkOperationSuccessOnCell(KeyValue.Type targetOp,
+ protected void checkOperationSuccessOnCell(Table table,
+ KeyValue.Type targetOp,
@Nullable byte[] expectedValue,
byte[] tableName,
byte[] row,
byte[] fam,
byte[] col) {
- try (HTable table = new HTable(hbaseConf, tableName)) {
+ try {
Get get = new Get(row).setMaxVersions(1);
Result result = table.get(get);
Cell latestCell = result.getColumnLatestCell(fam, col);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
index 0f92d54..255a12d 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestUpdateScan.java
@@ -17,6 +17,9 @@
*/
package org.apache.omid.transaction;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -34,9 +37,6 @@ import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
@Test(groups = "sharedHBase")
public class TestUpdateScan extends OmidTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestUpdateScan.class);
@@ -48,13 +48,13 @@ public class TestUpdateScan extends OmidTestBase {
public void testGet(ITestContext context) throws Exception {
try {
TransactionManager tm = newTransactionManager(context);
- TTable table = new TTable(hbaseConf, TEST_TABLE);
+ TTable table = new TTable(connection, TEST_TABLE);
Transaction t = tm.begin();
int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
for (int i = 0; i < lInts.length; i++) {
byte[] data = Bytes.toBytes(lInts[i]);
Put put = new Put(data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
table.put(t, put);
}
int startKeyValue = lInts[3];
@@ -105,15 +105,15 @@ public class TestUpdateScan extends OmidTestBase {
@Test(timeOut = 10_000)
public void testScan(ITestContext context) throws Exception {
- try (TTable table = new TTable(hbaseConf, TEST_TABLE)) {
+ try (TTable table = new TTable(connection, TEST_TABLE)) {
TransactionManager tm = newTransactionManager(context);
Transaction t = tm.begin();
int[] lInts = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
for (int lInt : lInts) {
byte[] data = Bytes.toBytes(lInt);
Put put = new Put(data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL_2), data);
table.put(t, put);
}
@@ -154,13 +154,13 @@ public class TestUpdateScan extends OmidTestBase {
public void testScanUncommitted(ITestContext context) throws Exception {
try {
TransactionManager tm = newTransactionManager(context);
- TTable table = new TTable(hbaseConf, TEST_TABLE);
+ TTable table = new TTable(connection, TEST_TABLE);
Transaction t = tm.begin();
int[] lIntsA = new int[]{100, 243, 2342, 22, 1, 5, 43, 56};
for (int aLIntsA : lIntsA) {
byte[] data = Bytes.toBytes(aLIntsA);
Put put = new Put(data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
table.put(t, put);
}
tm.commit(t);
@@ -170,7 +170,7 @@ public class TestUpdateScan extends OmidTestBase {
for (int aLIntsB : lIntsB) {
byte[] data = Bytes.toBytes(aLIntsB);
Put put = new Put(data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
table.put(tu, put);
}
@@ -179,7 +179,7 @@ public class TestUpdateScan extends OmidTestBase {
for (int aLIntsC : lIntsC) {
byte[] data = Bytes.toBytes(aLIntsC);
Put put = new Put(data);
- put.add(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
+ put.addColumn(Bytes.toBytes(TEST_FAMILY), Bytes.toBytes(TEST_COL), data);
table.put(t, put);
}
tm.commit(t);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index 89815c4..eb0ba04 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -17,26 +17,11 @@
*/
package org.apache.omid.committable.hbase;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
+import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
+import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
+import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
-import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -47,16 +32,36 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_QUALIFIER;
-import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
-import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
-import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
+import javax.inject.Inject;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
public class HBaseCommitTable implements CommitTable {
private static final Logger LOG = LoggerFactory.getLogger(HBaseCommitTable.class);
- private final Configuration hbaseConfig;
+ private final Connection hbaseConnection;
private final String tableName;
private final byte[] commitTableFamily;
private final byte[] lowWatermarkFamily;
@@ -65,15 +70,16 @@ public class HBaseCommitTable implements CommitTable {
/**
* Create a hbase commit table.
* Note that we do not take ownership of the passed htable, it is just used to construct the writer and client.
+ * @throws IOException
*/
@Inject
- public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) {
+ public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config) throws IOException {
this(hbaseConfig, config, KeyGeneratorImplementations.defaultKeyGenerator());
}
- public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) {
+ public HBaseCommitTable(Configuration hbaseConfig, HBaseCommitTableConfig config, KeyGenerator keygen) throws IOException {
- this.hbaseConfig = hbaseConfig;
+ this.hbaseConnection = ConnectionFactory.createConnection(hbaseConfig);
this.tableName = config.getTableName();
this.commitTableFamily = config.getCommitTableFamily();
this.lowWatermarkFamily = config.getLowWatermarkFamily();
@@ -88,13 +94,13 @@ public class HBaseCommitTable implements CommitTable {
private class HBaseWriter implements Writer {
private static final long INITIAL_LWM_VALUE = -1L;
- final HTable table;
+ final Table table;
// Our own buffer for operations
final List<Put> writeBuffer = new LinkedList<>();
volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
HBaseWriter() throws IOException {
- table = new HTable(hbaseConfig, tableName);
+ table = hbaseConnection.getTable(TableName.valueOf(tableName));
}
@Override
@@ -102,7 +108,7 @@ public class HBaseCommitTable implements CommitTable {
assert (startTimestamp < commitTimestamp);
Put put = new Put(startTimestampToKey(startTimestamp), startTimestamp);
byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
- put.add(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+ put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
writeBuffer.add(put);
}
@@ -138,7 +144,7 @@ public class HBaseCommitTable implements CommitTable {
long lowWatermark = lowWatermarkToStore;
if(lowWatermark != INITIAL_LWM_VALUE) {
Put put = new Put(LOW_WATERMARK_ROW);
- put.add(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
+ put.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER, Bytes.toBytes(lowWatermark));
writeBuffer.add(put);
}
}
@@ -147,17 +153,19 @@ public class HBaseCommitTable implements CommitTable {
class HBaseClient implements Client, Runnable {
- final HTable table;
- final HTable deleteTable;
+ final Table table;
+ final Table deleteTable;
final ExecutorService deleteBatchExecutor;
final BlockingQueue<DeleteRequest> deleteQueue;
boolean isClosed = false; // @GuardedBy("this")
final static int DELETE_BATCH_SIZE = 1024;
HBaseClient() throws IOException {
- table = new HTable(hbaseConfig, tableName);
- table.setAutoFlush(false, true);
- deleteTable = new HTable(hbaseConfig, tableName);
+ // TODO: create TTable here instead
+ table = hbaseConnection.getTable(TableName.valueOf(tableName));
+ // FIXME: why is this using autoFlush of false? Why would every Delete
+ // need to be send through a separate RPC?
+ deleteTable = hbaseConnection.getTable(TableName.valueOf(tableName));
deleteQueue = new ArrayBlockingQueue<>(DELETE_BATCH_SIZE);
deleteBatchExecutor = Executors.newSingleThreadExecutor(
@@ -254,7 +262,7 @@ public class HBaseCommitTable implements CommitTable {
try {
byte[] row = startTimestampToKey(startTimestamp);
Put invalidationPut = new Put(row, startTimestamp);
- invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, null);
+ invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, null);
// We need to write to the invalid column only if the commit timestamp
// is empty. This has to be done atomically. Otherwise, if we first
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
index 9493a44..5bb860d 100644
--- a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
+++ b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
@@ -17,8 +17,13 @@
*/
package org.apache.omid.committable.hbase;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -26,10 +31,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
-import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -44,12 +51,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
public class TestHBaseCommitTable {
@@ -62,7 +65,7 @@ public class TestHBaseCommitTable {
private static HBaseTestingUtility testutil;
private static MiniHBaseCluster hbasecluster;
protected static Configuration hbaseConf;
- private static AggregationClient aggregationClient;
+ protected static Connection connection;
private byte[] commitTableFamily;
private byte[] lowWatermarkFamily;
@@ -77,8 +80,7 @@ public class TestHBaseCommitTable {
LOG.info("Create hbase");
testutil = new HBaseTestingUtility(hbaseConf);
hbasecluster = testutil.startMiniCluster(1);
- aggregationClient = new AggregationClient(hbaseConf);
-
+ connection = ConnectionFactory.createConnection(hbaseConf);
}
@AfterClass
@@ -90,9 +92,9 @@ public class TestHBaseCommitTable {
@BeforeMethod
public void setUp() throws Exception {
- HBaseAdmin admin = testutil.getHBaseAdmin();
+ Admin admin = testutil.getHBaseAdmin();
- if (!admin.tableExists(TEST_TABLE)) {
+ if (!admin.tableExists(TableName.valueOf(TEST_TABLE))) {
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor datafam = new HColumnDescriptor(commitTableFamily);
@@ -103,12 +105,13 @@ public class TestHBaseCommitTable {
lowWatermarkFam.setMaxVersions(Integer.MAX_VALUE);
desc.addFamily(lowWatermarkFam);
- desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
+ // Move to HBaseSims for 2.0 support
+ // For 2.0, use TableDescriptorBuilder to build TableDescriptor
admin.createTable(desc);
}
- if (admin.isTableDisabled(TEST_TABLE)) {
- admin.enableTable(TEST_TABLE);
+ if (admin.isTableDisabled(TableName.valueOf(TEST_TABLE))) {
+ admin.enableTable(TableName.valueOf(TEST_TABLE));
}
HTableDescriptor[] tables = admin.listTables();
for (HTableDescriptor t : tables) {
@@ -120,9 +123,9 @@ public class TestHBaseCommitTable {
public void tearDown() {
try {
LOG.info("tearing Down");
- HBaseAdmin admin = testutil.getHBaseAdmin();
- admin.disableTable(TEST_TABLE);
- admin.deleteTable(TEST_TABLE);
+ Admin admin = testutil.getHBaseAdmin();
+ admin.disableTable(TableName.valueOf(TEST_TABLE));
+ admin.deleteTable(TableName.valueOf(TEST_TABLE));
} catch (Exception e) {
LOG.error("Error tearing down", e);
@@ -289,10 +292,17 @@ public class TestHBaseCommitTable {
}
- private static long rowCount(TableName table, byte[] family) throws Throwable {
+ private static long rowCount(TableName tableName, byte[] family) throws Throwable {
Scan scan = new Scan();
scan.addFamily(family);
- return aggregationClient.rowCount(table, new LongColumnInterpreter(), scan);
+ Table table = connection.getTable(tableName);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int count = 0;
+ while (scanner.next() != null) {
+ count++;
+ }
+ return count;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
index 04900a6..df22e70 100644
--- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
+++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
@@ -17,13 +17,13 @@
*/
package org.apache.omid.transaction;
-import com.google.common.base.Charsets;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -36,13 +36,13 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import com.google.common.base.Charsets;
+import com.google.common.base.Objects;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
@SuppressWarnings("all")
public final class CellUtils {
@@ -301,6 +301,8 @@ public final class CellUtils {
*/
public static SortedMap<Cell, Optional<Cell>> mapCellsToShadowCells(List<Cell> cells) {
+ // Move CellComparator to HBaseSims for 2.0 support
+ // Need to access through CellComparatorImpl.COMPARATOR
SortedMap<Cell, Optional<Cell>> cellToShadowCellMap
= new TreeMap<Cell, Optional<Cell>>(new CellComparator());
@@ -315,7 +317,7 @@ public final class CellUtils {
// TODO: Should we check also here the MVCC and swap if its greater???
// Values are the same, ignore
} else {
- if (cell.getMvccVersion() > storedCell.getMvccVersion()) { // Swap values
+ if (cell.getSequenceId() > storedCell.getSequenceId()) { // Swap values
Optional<Cell> previousValue = cellToShadowCellMap.remove(storedCell);
Preconditions.checkNotNull(previousValue, "Should contain an Optional<Cell> value");
cellIdToCellMap.put(key, cell);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index 3dfcb2a..2aae1b5 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -17,9 +17,25 @@
*/
package org.apache.omid.transaction;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -28,17 +44,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
@@ -59,24 +76,9 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
public class TestCompaction {
@@ -96,7 +98,7 @@ public class TestCompaction {
private Injector injector;
- private HBaseAdmin admin;
+ private Admin admin;
private Configuration hbaseConf;
private HBaseTestingUtility hbaseTestUtil;
private MiniHBaseCluster hbaseCluster;
@@ -106,6 +108,7 @@ public class TestCompaction {
private AggregationClient aggregationClient;
private CommitTable commitTable;
private PostCommitActions syncPostCommitter;
+ private Connection connection;
@BeforeClass
public void setupTestCompation() throws Exception {
@@ -122,8 +125,9 @@ public class TestCompaction {
hbaseConf.setInt("hbase.hstore.compaction.max", 2);
setupHBase();
+ connection = ConnectionFactory.createConnection(hbaseConf);
aggregationClient = new AggregationClient(hbaseConf);
- admin = new HBaseAdmin(hbaseConf);
+ admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
@@ -149,7 +153,7 @@ public class TestCompaction {
}
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
- if (!admin.tableExists(tableName)) {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
@@ -209,7 +213,7 @@ public class TestCompaction {
public void testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction() throws Throwable {
String TEST_TABLE = "testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
final int ROWS_TO_ADD = 5;
@@ -222,13 +226,13 @@ public class TestCompaction {
LOG.info("AssignedLowWatermark " + fakeAssignedLowWatermark);
}
Put put = new Put(Bytes.toBytes(rowId));
- put.add(fam, qual, data);
+ put.addColumn(fam, qual, data);
txTable.put(tx, put);
tm.commit(tx);
}
LOG.info("Flushing table {}", TEST_TABLE);
- admin.flush(TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
// Return a LWM that triggers compaction & stays between 1 and the max start timestamp assigned to previous TXs
LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
@@ -241,7 +245,7 @@ public class TestCompaction {
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClientQueue.add(commitTableClient);
LOG.info("Compacting table {}", TEST_TABLE);
- admin.majorCompact(TEST_TABLE);
+ admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
@@ -255,7 +259,7 @@ public class TestCompaction {
public void testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction() throws Exception {
String TEST_TABLE = "testTXWithoutShadowCellsAndWithSTBelowLWMGetsShadowCellHealedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// The following line emulates a crash after commit that is observed in (*) below
doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -266,7 +270,7 @@ public class TestCompaction {
// Test shadow cell are created properly
Put put = new Put(Bytes.toBytes(row));
- put.add(fam, qual, data);
+ put.addColumn(fam, qual, data);
txTable.put(problematicTx, put);
try {
tm.commit(problematicTx);
@@ -293,10 +297,10 @@ public class TestCompaction {
omidCompactor.commitTableClientQueue.add(commitTableClient);
LOG.info("Flushing table {}", TEST_TABLE);
- admin.flush(TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
LOG.info("Compacting table {}", TEST_TABLE);
- admin.majorCompact(TEST_TABLE);
+ admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
@@ -317,13 +321,13 @@ public class TestCompaction {
TEST_TABLE =
"testNeverendingTXsWithSTBelowAndAboveLWMAreDiscardedAndPreservedRespectivelyAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// The KV in this transaction should be discarded
HBaseTransaction neverendingTxBelowLowWatermark = (HBaseTransaction) tm.begin();
long rowId = randomGenerator.nextLong();
Put put = new Put(Bytes.toBytes(rowId));
- put.add(fam, qual, data);
+ put.addColumn(fam, qual, data);
txTable.put(neverendingTxBelowLowWatermark, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTxBelowLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
@@ -336,7 +340,7 @@ public class TestCompaction {
HBaseTransaction neverendingTxAboveLowWatermark = (HBaseTransaction) tm.begin();
long anotherRowId = randomGenerator.nextLong();
put = new Put(Bytes.toBytes(anotherRowId));
- put.add(fam, qual, data);
+ put.addColumn(fam, qual, data);
txTable.put(neverendingTxAboveLowWatermark, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(anotherRowId), fam, qual, neverendingTxAboveLowWatermark.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
@@ -347,7 +351,7 @@ public class TestCompaction {
assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table before flushing should be 2");
LOG.info("Flushing table {}", TEST_TABLE);
- admin.flush(TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 2, "Rows in table after flushing should be 2");
// Return a LWM that triggers compaction and stays between both ST of TXs, so assign 1st TX's start timestamp
@@ -361,7 +365,7 @@ public class TestCompaction {
doReturn(f).when(commitTableClient).readLowWatermark();
omidCompactor.commitTableClientQueue.add(commitTableClient);
LOG.info("Compacting table {}", TEST_TABLE);
- admin.majorCompact(TEST_TABLE);
+ admin.majorCompact(TableName.valueOf(TEST_TABLE));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
@@ -390,14 +394,14 @@ public class TestCompaction {
public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReached";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// The KV in this transaction should be discarded but in the end should remain there because
// the commit table won't be accessed (simulating an error on access)
HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
long rowId = randomGenerator.nextLong();
Put put = new Put(Bytes.toBytes(rowId));
- put.add(fam, qual, data);
+ put.addColumn(fam, qual, data);
txTable.put(neverendingTx, put);
assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
new TTableCellGetterAdapter(txTable)),
@@ -408,7 +412,7 @@ public class TestCompaction {
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
LOG.info("Flushing table {}", TEST_TABLE);
- admin.flush(TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
// Break access to CommitTable functionality in Compactor
@@ -423,7 +427,7 @@ public class TestCompaction {
omidCompactor.commitTableClientQueue.add(commitTableClient);
LOG.info("Compacting table {}", TEST_TABLE);
- admin.majorCompact(TEST_TABLE); // Should trigger the error when accessing CommitTable funct.
+ admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
@@ -443,13 +447,13 @@ public class TestCompaction {
public void testOriginalTableParametersAreAvoidedAlsoWhenCompacting() throws Throwable {
String TEST_TABLE = "testOriginalTableParametersAreAvoidedAlsoWhenCompacting";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
long rowId = randomGenerator.nextLong();
for (int versionCount = 0; versionCount <= (2 * MAX_VERSIONS); versionCount++) {
Transaction tx = tm.begin();
Put put = new Put(Bytes.toBytes(rowId));
- put.add(fam, qual, Bytes.toBytes("testWrite-" + versionCount));
+ put.addColumn(fam, qual, Bytes.toBytes("testWrite-" + versionCount));
txTable.put(tx, put);
tm.commit(tx);
}
@@ -466,7 +470,7 @@ public class TestCompaction {
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table before flushing");
LOG.info("Flushing table {}", TEST_TABLE);
- admin.flush(TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after flushing");
// Return a LWM that triggers compaction
@@ -502,26 +506,26 @@ public class TestCompaction {
public void testOldCellsAreDiscardedAfterCompaction() throws Exception {
String TEST_TABLE = "testOldCellsAreDiscardedAfterCompaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("row");
// Create 3 transactions modifying the same cell in a particular row
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowId);
- put1.add(fam, qual, Bytes.toBytes("testValue 1"));
+ put1.addColumn(fam, qual, Bytes.toBytes("testValue 1"));
txTable.put(tx1, put1);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put put2 = new Put(rowId);
- put2.add(fam, qual, Bytes.toBytes("testValue 2"));
+ put2.addColumn(fam, qual, Bytes.toBytes("testValue 2"));
txTable.put(tx2, put2);
tm.commit(tx2);
HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
Put put3 = new Put(rowId);
- put3.add(fam, qual, Bytes.toBytes("testValue 3"));
+ put3.addColumn(fam, qual, Bytes.toBytes("testValue 3"));
txTable.put(tx3, put3);
tm.commit(tx3);
@@ -566,7 +570,7 @@ public class TestCompaction {
assertEquals(Bytes.toBytes("testValue 3"), result.getValue(fam, qual));
// Write a new value
Put newPut1 = new Put(rowId);
- newPut1.add(fam, qual, Bytes.toBytes("new testValue 1"));
+ newPut1.addColumn(fam, qual, Bytes.toBytes("new testValue 1"));
txTable.put(newTx1, newPut1);
// Start a second new transaction
@@ -608,7 +612,7 @@ public class TestCompaction {
public void testDuplicateDeletes() throws Throwable {
String TEST_TABLE = "testDuplicateDeletes";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// jump through hoops to trigger a minor compaction.
// a minor compaction will only run if there are enough
@@ -620,7 +624,7 @@ public class TestCompaction {
byte[] firstRow = "FirstRow".getBytes();
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
Put put0 = new Put(firstRow);
- put0.add(fam, qual, Bytes.toBytes("testWrite-1"));
+ put0.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx0, put0);
tm.commit(tx0);
@@ -631,7 +635,7 @@ public class TestCompaction {
byte[] rowToBeCompactedAway = "compactMe".getBytes();
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowToBeCompactedAway);
- put1.add(fam, qual, Bytes.toBytes("testWrite-1"));
+ put1.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx1, put1);
txTable.flushCommits();
@@ -639,13 +643,13 @@ public class TestCompaction {
byte[] row = "iCauseErrors".getBytes();
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put put2 = new Put(row);
- put2.add(fam, qual, Bytes.toBytes("testWrite-1"));
+ put2.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx2, put2);
tm.commit(tx2);
HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
Put put3 = new Put(row);
- put3.add(fam, qual, Bytes.toBytes("testWrite-1"));
+ put3.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx3, put3);
txTable.flushCommits();
@@ -655,7 +659,7 @@ public class TestCompaction {
List<HBaseCellId> newWriteSet = new ArrayList<>();
final AtomicBoolean flushFailing = new AtomicBoolean(true);
for (HBaseCellId id : writeSet) {
- HTableInterface failableHTable = spy(id.getTable());
+ TTable failableHTable = spy(id.getTable());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation)
@@ -693,7 +697,7 @@ public class TestCompaction {
byte[] anotherRow = "someotherrow".getBytes();
HBaseTransaction tx4 = (HBaseTransaction) tm.begin();
Put put4 = new Put(anotherRow);
- put4.add(fam, qual, Bytes.toBytes("testWrite-1"));
+ put4.addColumn(fam, qual, Bytes.toBytes("testWrite-1"));
txTable.put(tx4, put4);
tm.commit(tx4);
@@ -702,7 +706,7 @@ public class TestCompaction {
// trigger minor compaction and give it time to run
setCompactorLWM(tx4.getStartTimestamp(), TEST_TABLE);
- admin.compact(TEST_TABLE);
+ admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(3000);
// check if the cell that should be compacted, is compacted
@@ -715,24 +719,24 @@ public class TestCompaction {
public void testNonOmidCFIsUntouched() throws Throwable {
String TEST_TABLE = "testNonOmidCFIsUntouched";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
- admin.disableTable(TEST_TABLE);
+ admin.disableTable(TableName.valueOf(TEST_TABLE));
byte[] nonOmidCF = Bytes.toBytes("nonOmidCF");
byte[] nonOmidQual = Bytes.toBytes("nonOmidCol");
HColumnDescriptor nonomidfam = new HColumnDescriptor(nonOmidCF);
nonomidfam.setMaxVersions(MAX_VERSIONS);
- admin.addColumn(TEST_TABLE, nonomidfam);
- admin.enableTable(TEST_TABLE);
+ admin.addColumn(TableName.valueOf(TEST_TABLE), nonomidfam);
+ admin.enableTable(TableName.valueOf(TEST_TABLE));
byte[] rowId = Bytes.toBytes("testRow");
Transaction tx = tm.begin();
Put put = new Put(rowId);
- put.add(fam, qual, Bytes.toBytes("testValue"));
+ put.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx, put);
Put nonTxPut = new Put(rowId);
- nonTxPut.add(nonOmidCF, nonOmidQual, Bytes.toBytes("nonTxVal"));
+ nonTxPut.addColumn(nonOmidCF, nonOmidQual, Bytes.toBytes("nonTxVal"));
txTable.getHTable().put(nonTxPut);
txTable.flushCommits(); // to make sure it left the client
@@ -759,21 +763,21 @@ public class TestCompaction {
public void testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testACellDeletedNonTransactionallyDoesNotAppearWhenAMajorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
- HTable table = new HTable(hbaseConf, TEST_TABLE);
+ Table table = txTable.getHTable();
// Write first a value transactionally
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("row1");
Put p0 = new Put(rowId);
- p0.add(fam, qual, Bytes.toBytes("testValue-0"));
+ p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p0);
tm.commit(tx0);
// Then perform a non-transactional Delete
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
table.delete(d);
// Trigger a major compaction
@@ -800,9 +804,9 @@ public class TestCompaction {
public void testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testACellDeletedNonTransactionallyIsPreservedWhenMinorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
- HTable table = new HTable(hbaseConf, TEST_TABLE);
+ Table table = txTable.getHTable();
// Configure the environment to create a minor compaction
@@ -810,7 +814,7 @@ public class TestCompaction {
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("row1");
Put p0 = new Put(rowId);
- p0.add(fam, qual, Bytes.toBytes("testValue-0"));
+ p0.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p0);
tm.commit(tx0);
@@ -820,7 +824,7 @@ public class TestCompaction {
// Write another value transactionally
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put p1 = new Put(rowId);
- p1.add(fam, qual, Bytes.toBytes("testValue-1"));
+ p1.addColumn(fam, qual, Bytes.toBytes("testValue-1"));
txTable.put(tx1, p1);
tm.commit(tx1);
@@ -830,7 +834,7 @@ public class TestCompaction {
// Write yet another value transactionally
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put p2 = new Put(rowId);
- p2.add(fam, qual, Bytes.toBytes("testValue-2"));
+ p2.addColumn(fam, qual, Bytes.toBytes("testValue-2"));
txTable.put(tx2, p2);
tm.commit(tx2);
@@ -839,7 +843,7 @@ public class TestCompaction {
// Then perform a non-transactional Delete
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
table.delete(d);
// create the fourth hfile
@@ -848,7 +852,7 @@ public class TestCompaction {
// Trigger the minor compaction
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
- admin.compact(TEST_TABLE);
+ admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(5000);
// Then perform a non-tx (raw) scan...
@@ -887,14 +891,14 @@ public class TestCompaction {
public void testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs() throws Throwable {
String TEST_TABLE = "testTombstonesAreNotCleanedUpWhenMinorCompactionOccurs";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
// Configure the environment to create a minor compaction
HBaseTransaction tx0 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case1");
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue-0"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue-0"));
txTable.put(tx0, p);
tm.commit(tx0);
@@ -904,7 +908,7 @@ public class TestCompaction {
// Create the tombstone
HBaseTransaction deleteTx = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(deleteTx, d);
tm.commit(deleteTx);
@@ -913,7 +917,7 @@ public class TestCompaction {
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Put p1 = new Put(rowId);
- p1.add(fam, qual, Bytes.toBytes("testValue-11"));
+ p1.addColumn(fam, qual, Bytes.toBytes("testValue-11"));
txTable.put(tx1, p1);
tm.commit(tx1);
@@ -922,14 +926,14 @@ public class TestCompaction {
HBaseTransaction lastTx = (HBaseTransaction) tm.begin();
Put p2 = new Put(rowId);
- p2.add(fam, qual, Bytes.toBytes("testValue-222"));
+ p2.addColumn(fam, qual, Bytes.toBytes("testValue-222"));
txTable.put(lastTx, p2);
tm.commit(lastTx);
// Trigger the minor compaction
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
setCompactorLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
- admin.compact(TEST_TABLE);
+ admin.compact(TableName.valueOf(TEST_TABLE));
Thread.sleep(5000);
// Checks on results after compaction
@@ -958,12 +962,12 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase1() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase1";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case1");
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
@@ -972,7 +976,7 @@ public class TestCompaction {
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx2, d);
tm.commit(tx2);
@@ -994,18 +998,18 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase2() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase2";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case2");
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx2, d);
tm.commit(tx2);
@@ -1031,18 +1035,18 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase3() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase3";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case3");
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx2, d);
HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
@@ -1067,12 +1071,12 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase4() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase4";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case4");
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx1, p);
tm.commit(tx1);
@@ -1080,7 +1084,7 @@ public class TestCompaction {
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx2, d);
compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
@@ -1102,12 +1106,12 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase5() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase5";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
byte[] rowId = Bytes.toBytes("case5");
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx1, d);
tm.commit(tx1);
@@ -1128,18 +1132,18 @@ public class TestCompaction {
public void testTombstonesAreCleanedUpCase6() throws Exception {
String TEST_TABLE = "testTombstonesAreCleanedUpCase6";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable txTable = new TTable(hbaseConf, TEST_TABLE);
+ TTable txTable = new TTable(connection, TEST_TABLE);
byte[] rowId = Bytes.toBytes("case6");
HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
Delete d = new Delete(rowId);
- d.deleteColumn(fam, qual);
+ d.addColumn(fam, qual);
txTable.delete(tx1, d);
tm.commit(tx1);
HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
Put p = new Put(rowId);
- p.add(fam, qual, Bytes.toBytes("testValue"));
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
txTable.put(tx2, p);
tm.commit(tx2);
@@ -1173,12 +1177,12 @@ public class TestCompaction {
}
private void compactWithLWM(long lwm, String tableName) throws Exception {
- admin.flush(tableName);
+ admin.flush(TableName.valueOf(tableName));
LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
setCompactorLWM(lwm, tableName);
LOG.info("Compacting table {}", tableName);
- admin.majorCompact(tableName);
+ admin.majorCompact(TableName.valueOf(tableName));
LOG.info("Sleeping for 3 secs");
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/75dc8177/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 90dd5dd..8934b08 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -17,26 +17,32 @@
*/
package org.apache.omid.transaction;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
-
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
@@ -52,7 +58,6 @@ import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -62,16 +67,9 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
public class TestSnapshotFilter {
@@ -85,16 +83,16 @@ public class TestSnapshotFilter {
private Injector injector;
- private HBaseAdmin admin;
+ private Admin admin;
private Configuration hbaseConf;
private HBaseTestingUtility hbaseTestUtil;
private MiniHBaseCluster hbaseCluster;
private TSOServer tso;
- private AggregationClient aggregationClient;
private CommitTable commitTable;
private PostCommitActions syncPostCommitter;
+ private Connection connection;
@BeforeClass
public void setupTestSnapshotFilter() throws Exception {
@@ -110,8 +108,8 @@ public class TestSnapshotFilter {
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
setupHBase();
- aggregationClient = new AggregationClient(hbaseConf);
- admin = new HBaseAdmin(hbaseConf);
+ connection = ConnectionFactory.createConnection(hbaseConf);
+ admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
@@ -137,7 +135,7 @@ public class TestSnapshotFilter {
}
private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
- if (!admin.tableExists(tableName)) {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
LOG.info("Creating {} table...", tableName);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
@@ -199,12 +197,12 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testGetFirstResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
tm.commit(tx1);
@@ -224,7 +222,7 @@ public class TestSnapshotFilter {
Transaction tx3 = tm.begin();
Put put3 = new Put(rowName1);
- put3.add(famName1, colName1, dataValue1);
+ put3.addColumn(famName1, colName1, dataValue1);
tt.put(tx3, put3);
tm.commit(tx3);
@@ -257,17 +255,17 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testServerSideSnapshotFiltering";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName1, colName1, dataValue2);
+ put2.addColumn(famName1, colName1, dataValue2);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
@@ -299,17 +297,17 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testServerSideSnapshotFiltering";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName1, colName1, dataValue2);
+ put2.addColumn(famName1, colName1, dataValue2);
// tt.put(tx2, put2);
Transaction tx3 = tm.begin();
@@ -344,19 +342,19 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testGetWithFamilyDelete";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName2, colName2, dataValue1);
+ put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
@@ -425,7 +423,7 @@ public class TestSnapshotFilter {
readAfterCommit.await();
Transaction tx4 = tm.begin();
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Get get = new Get(rowName1);
Filter filter1 = new FilterList(FilterList.Operator.MUST_PASS_ONE,
@@ -451,10 +449,10 @@ public class TestSnapshotFilter {
};
readThread.start();
- TTable table = new TTable(hbaseConf, TEST_TABLE);
+ TTable table = new TTable(connection, TEST_TABLE);
final HBaseTransaction t1 = (HBaseTransaction) tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
table.put(t1, put1);
tm.commit(t1);
@@ -477,19 +475,19 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testGetWithFilter";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName2, colName2, dataValue1);
+ put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
@@ -528,19 +526,19 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testGetSecondResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName1, colName1, dataValue1);
+ put2.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
@@ -568,12 +566,12 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanFirstResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put row1 = new Put(rowName1);
- row1.add(famName1, colName1, dataValue1);
+ row1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, row1);
tm.commit(tx1);
@@ -592,7 +590,7 @@ public class TestSnapshotFilter {
Transaction tx3 = tm.begin();
Put put3 = new Put(rowName1);
- put3.add(famName1, colName1, dataValue1);
+ put3.addColumn(famName1, colName1, dataValue1);
tt.put(tx3, put3);
tm.commit(tx3);
@@ -623,17 +621,17 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanWithFilter";
createTableIfNotExists(TEST_TABLE, famName1, famName2);
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName2, colName2, dataValue1);
+ put2.addColumn(famName2, colName2, dataValue1);
tt.put(tx2, put2);
tm.commit(tx2);
@@ -673,12 +671,12 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanSecondResult";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName1, colName1, dataValue1);
+ put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
@@ -686,7 +684,7 @@ public class TestSnapshotFilter {
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
- put2.add(famName1, colName1, dataValue1);
+ put2.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();
@@ -717,12 +715,12 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanFewResults";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName, colName1, dataValue1);
+ put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
tm.commit(tx1);
@@ -730,7 +728,7 @@ public class TestSnapshotFilter {
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName2);
- put2.add(famName, colName2, dataValue2);
+ put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
tm.commit(tx2);
@@ -767,15 +765,15 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanFewResultsDifferentTransaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName, colName1, dataValue1);
+ put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
Put put2 = new Put(rowName2);
- put2.add(famName, colName2, dataValue2);
+ put2.addColumn(famName, colName2, dataValue2);
tt.put(tx1, put2);
tm.commit(tx1);
@@ -783,7 +781,7 @@ public class TestSnapshotFilter {
Transaction tx2 = tm.begin();
put2 = new Put(rowName2);
- put2.add(famName, colName2, dataValue2);
+ put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
tm.commit(tx2);
@@ -820,15 +818,15 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testScanFewResultsSameTransaction";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
- TTable tt = new TTable(hbaseConf, TEST_TABLE);
+ TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
Put put1 = new Put(rowName1);
- put1.add(famName, colName1, dataValue1);
+ put1.addColumn(famName, colName1, dataValue1);
tt.put(tx1, put1);
Put put2 = new Put(rowName2);
- put2.add(famName, colName2, dataValue2);
+ put2.addColumn(famName, colName2, dataValue2);
tt.put(tx1, put2);
tm.commit(tx1);
@@ -836,7 +834,7 @@ public class TestSnapshotFilter {
Transaction tx2 = tm.begin();
put2 = new Put(rowName2);
- put2.add(famName, colName2, dataValue2);
+ put2.addColumn(famName, colName2, dataValue2);
tt.put(tx2, put2);
Transaction tx3 = tm.begin();