You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2017/10/13 21:52:16 UTC

incubator-tephra git commit: Changes to replace HBaseAdmin, HTable and HTableDescriptor classes

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 9b841c68f -> 8d4bc2b0b


Changes to replace HBaseAdmin, HTable and HTableDescriptor classes

This closes #45

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8d4bc2b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8d4bc2b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8d4bc2b0

Branch: refs/heads/master
Commit: 8d4bc2b0bc5db4c12b2625eed60308c1c0852d26
Parents: 9b841c6
Author: Biju Nair <gs...@gmail.com>
Authored: Sat Jun 24 23:28:05 2017 -0400
Committer: poorna <po...@apache.org>
Committed: Fri Oct 13 11:58:08 2017 -0700

----------------------------------------------------------------------
 bin/tephra                                      |  0
 .../tephra/hbase/SecondaryIndexTable.java       | 50 ++++++++----
 .../tephra/hbase/TransactionAwareHTable.java    | 85 +++-----------------
 .../hbase/TransactionAwareHTableTest.java       | 68 ++++++++--------
 4 files changed, 83 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/bin/tephra
----------------------------------------------------------------------
diff --git a/bin/tephra b/bin/tephra
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
index 8bf8768..280065e 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/SecondaryIndexTable.java
@@ -22,14 +22,16 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionFailureException;
@@ -55,19 +57,22 @@ public class SecondaryIndexTable implements Closeable {
   private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily");
   private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
   private static final byte[] DELIMITER  = new byte[] {0};
+  private Connection conn  = null;
 
-  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable,
-                             byte[] secondaryIndex) {
+  public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, Table hTable,
+                             byte[] secondaryIndex) throws IOException {
     secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
-    HTable secondaryIndexHTable = null;
-    try (HBaseAdmin hBaseAdmin = new HBaseAdmin(hTable.getConfiguration())) {
+    Table secondaryIndexHTable = null;
+    try {
+      conn  = ConnectionFactory.createConnection(hTable.getConfiguration());
+      Admin hBaseAdmin = conn.getAdmin();
       if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
         hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
       }
-      secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName);
+      secondaryIndexHTable = conn.getTable(secondaryIndexTableName);
     } catch (Exception e) {
       Throwables.propagate(e);
-    }
+    } 
 
     this.secondaryIndex = secondaryIndex;
     this.transactionAwareHTable = new TransactionAwareHTable(hTable);
@@ -163,16 +168,33 @@ public class SecondaryIndexTable implements Closeable {
 
   @Override
   public void close() throws IOException {
+    IOException ex = null;
     try {
       transactionAwareHTable.close();
     } catch (IOException e) {
-      try {
-        secondaryIndexTable.close();
-      } catch (IOException ex) {
-        e.addSuppressed(e);
+      ex = e;
+    }
+    try {
+      secondaryIndexTable.close();
+    } catch (IOException e) {
+      if (ex == null) {
+         ex = e;
+      } else {
+         ex.addSuppressed(e);
       }
-      throw e;
     }
-    secondaryIndexTable.close();
+    try {
+      conn.close();
+    } catch (IOException e) {
+      if (ex == null) {
+         ex = e;
+      } else {
+         ex.addSuppressed(e);
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index 531e010..b28737f 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -65,17 +65,17 @@ import java.util.Set;
  * was started.
  */
 public class TransactionAwareHTable extends AbstractTransactionAwareTable
-    implements HTableInterface, TransactionAware {
+    implements Table, TransactionAware {
 
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
-  private final HTableInterface hTable;
+  private final Table hTable;
 
   /**
    * Create a transactional aware instance of the passed HTable
    *
    * @param hTable underlying HBase table to use
    */
-  public TransactionAwareHTable(HTableInterface hTable) {
+  public TransactionAwareHTable(Table hTable) {
     this(hTable, false);
   }
 
@@ -85,7 +85,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param hTable underlying HBase table to use
    * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
     this(hTable, conflictLevel, false);
   }
 
@@ -96,7 +96,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+  public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
     this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
   }
 
@@ -108,7 +108,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
                                 boolean allowNonTransactional) {
     super(conflictLevel, allowNonTransactional);
     this.hTable = hTable;
@@ -123,7 +123,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
 
   @Override
   protected boolean doCommit() throws IOException {
-    hTable.flushCommits();
     return true;
   }
 
@@ -166,21 +165,15 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
       hTable.delete(rollbackDeletes);
       return true;
     } finally {
-      try {
-        hTable.flushCommits();
-      } catch (Exception e) {
-        LOG.error("Could not flush HTable commits", e);
-      }
       tx = null;
       changeSets.clear();
     }
   }
 
-  /* HTableInterface implementation */
+  /* Table implementation */
 
-  @Override
   public byte[] getTableName() {
-    return hTable.getTableName();
+    return hTable.getName().getName();
   }
 
   @Override
@@ -207,7 +200,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Boolean[] exists(List<Get> gets) throws IOException {
+  public boolean[] existsAll(List<Get> gets) throws IOException {
     if (tx == null) {
       throw new IOException("Transaction not started");
     }
@@ -215,7 +208,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     for (Get get : gets) {
       transactionalizedGets.add(transactionalizeAction(get));
     }
-    return hTable.exists(transactionalizedGets);
+    return hTable.existsAll(transactionalizedGets);
   }
 
   @Override
@@ -273,15 +266,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.getRowOrBefore(row, family);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
   public ResultScanner getScanner(Scan scan) throws IOException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -392,18 +376,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public boolean[] existsAll(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.existsAll(transactionalizedGets);
-  }
-
-  @Override
   public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
                                 CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations)
       throws IOException {
@@ -468,26 +440,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean isAutoFlush() {
-    return hTable.isAutoFlush();
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-    hTable.flushCommits();
-  }
-
-  @Override
   public void close() throws IOException {
     hTable.close();
   }
@@ -526,21 +478,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public void setAutoFlush(boolean autoFlush) {
-    setAutoFlushTo(autoFlush);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean autoFlush) {
-    hTable.setAutoFlushTo(autoFlush);
-  }
-
-  @Override
   public long getWriteBufferSize() {
     return hTable.getWriteBufferSize();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8d4bc2b0/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 11ffd1a..e48a158 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -99,8 +100,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   static TransactionManager txManager;
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
-  private HTable hTable;
-  
+  private Table hTable;
+  static Connection conn;
+
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
   
@@ -180,6 +182,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
     testUtil.startMiniCluster();
     hBaseAdmin = testUtil.getHBaseAdmin();
+    conn = testUtil.getConnection();
     txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector());
     txManager.startAndWait();
@@ -190,6 +193,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     if (txManager != null) {
       txManager.stopAndWait();
     }
+    if (conn != null) {
+      conn.close();
+    }
   }
 
   @Before
@@ -254,7 +260,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    */
   @Test
   public void testValidTransactionalDelete() throws Exception {
-    try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+    try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
                                      new byte[][]{TestBytes.family, TestBytes.family2})) {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -394,7 +400,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    */
   @Test
   public void testAttributesPreserved() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+    Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
         new byte[][]{TestBytes.family, TestBytes.family2}, false,
         Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
     try {
@@ -436,7 +442,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   
   @Test
   public void testFamilyDeleteWithCompaction() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+    Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
                                 new byte[][]{TestBytes.family, TestBytes.family2});
     try {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
@@ -473,9 +479,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
       int count = 0;
       while (count++ < 12 && !compactionDone) {
          // run major compaction and verify the row was removed
-         HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
-         hbaseAdmin.flush("TestFamilyDeleteWithCompaction");
-         hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction");
+         Admin hbaseAdmin = testUtil.getHBaseAdmin();
+         hbaseAdmin.flush(TableName.valueOf("TestFamilyDeleteWithCompaction"));
+         hbaseAdmin.majorCompact(TableName.valueOf("TestFamilyDeleteWithCompaction"));
          hbaseAdmin.close();
          Thread.sleep(5000L);
          
@@ -536,7 +542,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
     String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
-    HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+    Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
       txContext.start();
@@ -572,7 +578,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+    Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
       txContext.start();
@@ -604,7 +610,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testRowDelete() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+    Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
 
@@ -781,12 +787,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   @Test
   public void testReadYourWrites() throws Exception {
     // In-progress tx1: started before our main transaction
-    HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    Table hTable1 = conn.getTable(TableName.valueOf(TestBytes.table));
     TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
     TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
 
     // In-progress tx2: started while our main transaction is running
-    HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    Table hTable2 = conn.getTable(TableName.valueOf(TestBytes.table));
     TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
     TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
 
@@ -838,11 +844,11 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testRowLevelConflictDetection() throws Exception {
-    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.ROW);
     TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
 
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.ROW);
     TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
 
@@ -946,11 +952,11 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   @Test
   public void testNoneLevelConflictDetection() throws Exception {
     InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
-    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.NONE);
     TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
 
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.NONE);
     TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
 
@@ -1085,7 +1091,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
 
     // check that writes are still not visible to other clients
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)));
     TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
 
     txContext2.start();
@@ -1144,7 +1150,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
 
     // check that writes are not visible
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TestBytes.table)));
     TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
     txContext2.start();
     Transaction newTx = txContext2.getCurrentTransaction();
@@ -1180,7 +1186,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
 
     // Add some pre-existing, non-transactional data
-    HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+    Table nonTxTable = conn.getTable(TableName.valueOf(txTable.getTableName()));
     nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
     nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
     nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
@@ -1188,7 +1194,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
                                                HConstants.EMPTY_BYTE_ARRAY));
     nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
-    nonTxTable.flushCommits();
 
     // Add transactional data
     txContext.start();
@@ -1279,15 +1284,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     txContext.finish();
   }
 
-  private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+  private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception {
     verifyRow(table, new Get(rowkey), expectedValue);
   }
 
-  private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+  private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception {
     verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
   }
 
-  private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+  private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception {
     Result result = table.get(get);
     if (expectedValues == null) {
       assertTrue(result.isEmpty());
@@ -1307,12 +1312,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
 
-  private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+  private Cell[] getRow(Table table, Get get) throws Exception {
     Result result = table.get(get);
     return result.rawCells();
   }
 
-  private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+  private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception {
     List<Cell> actualCells = new ArrayList<>();
     try (ResultScanner scanner = table.getScanner(scan)) {
       Result[] results = scanner.next(expectedCells.size() + 1);
@@ -1325,7 +1330,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testVisibilityAll() throws Exception {
-    HTable nonTxTable =
+    Table nonTxTable =
       createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
                   true, Collections.singletonList(TransactionProcessor.class.getName()));
     TransactionAwareHTable txTable =
@@ -1497,7 +1502,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     // to prevent Tephra from replacing delete with delete marker
     deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
     nonTxTable.delete(deleteFamily);
-    nonTxTable.flushCommits();
 
     txContext.start();
     txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
@@ -1709,7 +1713,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    * Represents older transaction clients
    */
   private static class OldTransactionAwareHTable extends TransactionAwareHTable {
-    public OldTransactionAwareHTable(HTableInterface hTable) {
+    public OldTransactionAwareHTable(Table hTable) {
       super(hTable);
     }