You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2017/10/13 21:52:16 UTC
incubator-tephra git commit: Changes to replace HBaseAdmin,
HTable and HTableDescriptor classes
Repository: incubator-tephra
Updated Branches:
refs/heads/master 9b841c68f -> 8d4bc2b0b
Changes to replace HBaseAdmin, HTable and HTableDescriptor classes
This closes #45
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8d4bc2b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8d4bc2b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8d4bc2b0
Branch: refs/heads/master
Commit: 8d4bc2b0bc5db4c12b2625eed60308c1c0852d26
Parents: 9b841c6
Author: Biju Nair <gs...@gmail.com>
Authored: Sat Jun 24 23:28:05 2017 -0400
Committer: poorna <po...@apache.org>
Committed: Fri Oct 13 11:58:08 2017 -0700
----------------------------------------------------------------------
bin/tephra | 0
.../tephra/hbase/SecondaryIndexTable.java | 50 ++++++++----
.../tephra/hbase/TransactionAwareHTable.java | 85 +++-----------------
.../hbase/TransactionAwareHTableTest.java | 68 ++++++++--------
4 files changed, 83 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/bin/tephra
----------------------------------------------------------------------
diff --git a/bin/tephra b/bin/tephra
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
index 8bf8768..280065e 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
@@ -22,14 +22,16 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
@@ -55,19 +57,22 @@ public class SecondaryIndexTable implements Closeable {
private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
private static final byte[] DELIMITER = new byte[] {0};
+ private Connection conn = null;
- public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
- byte[] secondaryIndex) {
+ public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, Table hTable,
+ byte[] secondaryIndex) throws IOException {
secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
- HTable secondaryIndexHTable = null;
- try (HBaseAdmin hBaseAdmin = new HBaseAdmin(hTable.getConfiguration())) {
+ Table secondaryIndexHTable = null;
+ try {
+ conn = ConnectionFactory.createConnection(hTable.getConfiguration());
+ Admin hBaseAdmin = conn.getAdmin();
if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
}
- secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
+ secondaryIndexHTable = conn.getTable(secondaryIndexTableName);
} catch (Exception e) {
Throwables.propagate(e);
- }
+ }
this.secondaryIndex = secondaryIndex;
this.transactionAwareHTable = new TransactionAwareHTable(hTable);
@@ -163,16 +168,33 @@ public class SecondaryIndexTable implements Closeable {
@Override
public void close() throws IOException {
+ IOException ex = null;
try {
transactionAwareHTable.close();
} catch (IOException e) {
- try {
- secondaryIndexTable.close();
- } catch (IOException ex) {
- e.addSuppressed(e);
+ ex = e;
+ }
+ try {
+ secondaryIndexTable.close();
+ } catch (IOException e) {
+ if (ex == null) {
+ ex = e;
+ } else {
+ ex.addSuppressed(e);
}
- throw e;
}
- secondaryIndexTable.close();
+ try {
+ conn.close();
+ } catch (IOException e) {
+ if (ex == null) {
+ ex = e;
+ } else {
+ ex.addSuppressed(e);
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index 531e010..b28737f 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -65,17 +65,17 @@ import java.util.Set;
* was started.
*/
public class TransactionAwareHTable extends AbstractTransactionAwareTable
- implements HTableInterface, TransactionAware {
+ implements Table, TransactionAware {
private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
- private final HTableInterface hTable;
+ private final Table hTable;
/**
* Create a transactional aware instance of the passed HTable
*
* @param hTable underlying HBase table to use
*/
- public TransactionAwareHTable(HTableInterface hTable) {
+ public TransactionAwareHTable(Table hTable) {
this(hTable, false);
}
@@ -85,7 +85,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param hTable underlying HBase table to use
* @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
this(hTable, conflictLevel, false);
}
@@ -96,7 +96,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+ public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
}
@@ -108,7 +108,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
* @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
* will be available, though non-transactional
*/
- public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+ public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
boolean allowNonTransactional) {
super(conflictLevel, allowNonTransactional);
this.hTable = hTable;
@@ -123,7 +123,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
@Override
protected boolean doCommit() throws IOException {
- hTable.flushCommits();
return true;
}
@@ -166,21 +165,15 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
hTable.delete(rollbackDeletes);
return true;
} finally {
- try {
- hTable.flushCommits();
- } catch (Exception e) {
- LOG.error("Could not flush HTable commits", e);
- }
tx = null;
changeSets.clear();
}
}
- /* HTableInterface implementation */
+ /* Table implementation */
- @Override
public byte[] getTableName() {
- return hTable.getTableName();
+ return hTable.getName().getName();
}
@Override
@@ -207,7 +200,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Boolean[] exists(List<Get> gets) throws IOException {
+ public boolean[] existsAll(List<Get> gets) throws IOException {
if (tx == null) {
throw new IOException("Transaction not started");
}
@@ -215,7 +208,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
for (Get get : gets) {
transactionalizedGets.add(transactionalizeAction(get));
}
- return hTable.exists(transactionalizedGets);
+ return hTable.existsAll(transactionalizedGets);
}
@Override
@@ -273,15 +266,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
- if (allowNonTransactional) {
- return hTable.getRowOrBefore(row, family);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
public ResultScanner getScanner(Scan scan) throws IOException {
if (tx == null) {
throw new IOException("Transaction not started");
@@ -392,18 +376,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- if (tx == null) {
- throw new IOException("Transaction not started");
- }
- List<Get> transactionalizedGets = new ArrayList<>(gets.size());
- for (Get get : gets) {
- transactionalizedGets.add(transactionalizeAction(get));
- }
- return hTable.existsAll(transactionalizedGets);
- }
-
- @Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
throws IOException {
@@ -468,26 +440,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- if (allowNonTransactional) {
- return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
- } else {
- throw new UnsupportedOperationException("Operation is not supported transactionally");
- }
- }
-
- @Override
- public boolean isAutoFlush() {
- return hTable.isAutoFlush();
- }
-
- @Override
- public void flushCommits() throws IOException {
- hTable.flushCommits();
- }
-
- @Override
public void close() throws IOException {
hTable.close();
}
@@ -526,21 +478,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
}
@Override
- public void setAutoFlush(boolean autoFlush) {
- setAutoFlushTo(autoFlush);
- }
-
- @Override
- public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
- hTable.setAutoFlush(autoFlush, clearBufferOnFail);
- }
-
- @Override
- public void setAutoFlushTo(boolean autoFlush) {
- hTable.setAutoFlushTo(autoFlush);
- }
-
- @Override
public long getWriteBufferSize() {
return hTable.getWriteBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 11ffd1a..e48a158 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -99,8 +100,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
static TransactionManager txManager;
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
- private HTable hTable;
-
+ private Table hTable;
+ static Connection conn;
+
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -180,6 +182,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
testUtil.startMiniCluster();
hBaseAdmin = testUtil.getHBaseAdmin();
+ conn = testUtil.getConnection();
txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
txManager.startAndWait();
@@ -190,6 +193,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
if (txManager != null) {
txManager.stopAndWait();
}
+ if (conn != null) {
+ conn.close();
+ }
}
@Before
@@ -254,7 +260,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testValidTransactionalDelete() throws Exception {
- try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+ try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
new byte[][]{TestBytes.family, TestBytes.family2})) {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -394,7 +400,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
*/
@Test
public void testAttributesPreserved() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+ Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
new byte[][]{TestBytes.family, TestBytes.family2}, false,
Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
try {
@@ -436,7 +442,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testFamilyDeleteWithCompaction() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+ Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
new byte[][]{TestBytes.family, TestBytes.family2});
try {
TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
@@ -473,9 +479,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
int count = 0;
while (count++ < 12 && !compactionDone) {
// run major compaction and verify the row was removed
- HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
- hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
- hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+ Admin hbaseAdmin = testUtil.getHBaseAdmin();
+ hbaseAdmin.flush(TableName.valueOf("TestFamilyDeleteWithCompaction"));
+ hbaseAdmin.majorCompact(TableName.valueOf("TestFamilyDeleteWithCompaction"));
hbaseAdmin.close();
Thread.sleep(5000L);
@@ -536,7 +542,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
- HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+ Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -572,7 +578,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
@@ -604,7 +610,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowDelete() throws Exception {
- HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+ Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -781,12 +787,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testReadYourWrites() throws Exception {
// In-progress tx1: started before our main transaction
- HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable1 = conn.getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
// In-progress tx2: started while our main transaction is running
- HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+ Table hTable2 = conn.getTable(TableName.valueOf(TestBytes.table));
TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
@@ -838,11 +844,11 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testRowLevelConflictDetection() throws Exception {
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.ROW);
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
@@ -946,11 +952,11 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testNoneLevelConflictDetection() throws Exception {
InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
- TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
TxConstants.ConflictDetection.NONE);
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
@@ -1085,7 +1091,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
// check that writes are still not visible to other clients
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
txContext2.start();
@@ -1144,7 +1150,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
// check that writes are not visible
- TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+ TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)));
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
txContext2.start();
Transaction newTx = txContext2.getCurrentTransaction();
@@ -1180,7 +1186,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
// Add some pre-existing, non-transactional data
- HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+ Table nonTxTable = conn.getTable(TableName.valueOf(txTable.getTableName()));
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
@@ -1188,7 +1194,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
HConstants.EMPTY_BYTE_ARRAY));
nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
- nonTxTable.flushCommits();
// Add transactional data
txContext.start();
@@ -1279,15 +1284,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
txContext.finish();
}
- private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception {
verifyRow(table, new Get(rowkey), expectedValue);
}
- private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+ private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception {
verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
}
- private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+ private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception {
Result result = table.get(get);
if (expectedValues == null) {
assertTrue(result.isEmpty());
@@ -1307,12 +1312,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
}
}
- private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+ private Cell[] getRow(Table table, Get get) throws Exception {
Result result = table.get(get);
return result.rawCells();
}
- private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+ private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception {
List<Cell> actualCells = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
Result[] results = scanner.next(expectedCells.size() + 1);
@@ -1325,7 +1330,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
@Test
public void testVisibilityAll() throws Exception {
- HTable nonTxTable =
+ Table nonTxTable =
createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
true, Collections.singletonList(TransactionProcessor.class.getName()));
TransactionAwareHTable txTable =
@@ -1497,7 +1502,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
// to prevent Tephra from replacing delete with delete marker
deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
nonTxTable.delete(deleteFamily);
- nonTxTable.flushCommits();
txContext.start();
txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
@@ -1709,7 +1713,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
* Represents older transaction clients
*/
private static class OldTransactionAwareHTable extends TransactionAwareHTable {
- public OldTransactionAwareHTable(HTableInterface hTable) {
+ public OldTransactionAwareHTable(Table hTable) {
super(hTable);
}