You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/03/03 06:31:52 UTC

[incubator-omid] branch 1.0.1 updated (d30f709 -> 01ce12a)

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a change to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git.


    from d30f709  [OMID-133] When TTable autoflush is false, before read/scan flush tables.
     new 3a22dd6  [OMID-134] - transactionManager is thread safe. All HBase Table and Connectoins are close correctly. Create new Table from connection only when needed
     new 01ce12a  [OMID-135] - Fix compaction bug that deletes shadow cells above LWM

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/omid/benchmarks/tso/RawTxRunner.java    |  3 +-
 .../org/apache/omid/committable/CommitTable.java   |  4 +-
 .../omid/committable/InMemoryCommitTable.java      |  8 ---
 .../apache/omid/committable/NullCommitTable.java   |  8 ---
 .../omid/committable/NullCommitTableTest.java      | 50 +++++++--------
 .../transaction/AttributeSetSnapshotFilter.java    |  4 ++
 .../omid/transaction/HBaseSyncPostCommitter.java   | 23 ++++---
 .../omid/transaction/HBaseTransactionManager.java  | 32 ++++++----
 .../omid/transaction/HTableAccessWrapper.java      |  5 ++
 .../apache/omid/transaction/SnapshotFilter.java    |  2 +-
 .../omid/transaction/SnapshotFilterImpl.java       |  9 +--
 .../java/org/apache/omid/transaction/TTable.java   |  6 ++
 .../omid/transaction/TableAccessWrapper.java       |  2 +-
 .../transaction/TestAsynchronousPostCommitter.java |  6 +-
 .../org/apache/omid/transaction/TestCellUtils.java | 53 ++++++++++++++++
 .../org/apache/omid/transaction/TestFilters.java   |  4 +-
 .../transaction/TestHBaseTransactionClient.java    |  8 +--
 .../apache/omid/transaction/TestShadowCells.java   |  8 +--
 .../omid/committable/hbase/HBaseCommitTable.java   | 45 ++++++-------
 .../org/apache/omid/transaction/CellUtils.java     | 68 ++++++++++++++------
 .../hbase/regionserver/RegionAccessWrapper.java    |  4 ++
 .../apache/omid/transaction/CompactorScanner.java  |  7 +-
 .../org/apache/omid/transaction/CompactorUtil.java | 20 +++---
 .../org/apache/omid/transaction/OmidCompactor.java | 33 +++-------
 .../omid/transaction/OmidSnapshotFilter.java       | 12 ++--
 .../apache/omid/transaction/TestCompaction.java    | 74 ++++++++++++++++++++--
 .../omid/transaction/TestCompactorScanner.java     |  4 +-
 .../omid/transaction/TestSnapshotFilter.java       |  2 +-
 .../omid/transaction/TestSnapshotFilterLL.java     |  2 +-
 .../timestamp/storage/HBaseTimestampStorage.java   | 11 +++-
 .../transaction/AbstractTransactionManager.java    |  7 +-
 31 files changed, 334 insertions(+), 190 deletions(-)


[incubator-omid] 02/02: [OMID-135] - Fix compaction bug that deletes shadow cells above LWM

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit 01ce12abec4dbcc0a3c1fe4d119ae26dba5c553e
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Wed Feb 27 12:36:23 2019 +0200

    [OMID-135] - Fix compaction bug that deletes shadow cells above LWM
---
 .../org/apache/omid/transaction/TestCellUtils.java | 53 +++++++++++++++++
 .../org/apache/omid/transaction/CellUtils.java     | 68 ++++++++++++++++------
 .../apache/omid/transaction/CompactorScanner.java  |  3 +-
 .../apache/omid/transaction/TestCompaction.java    | 61 +++++++++++++++++++
 4 files changed, 165 insertions(+), 20 deletions(-)

diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java
index 8a689ca..cbf53d0 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.HBaseShims;
+import org.apache.omid.tso.client.CellId;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -227,4 +228,56 @@ public class TestCellUtils {
         assertEquals(originalQualifierLength, qualifier.length);
     }
 
+
+    @Test(timeOut = 10_000)
+    public void testmapCellsToShadowCellsCellOrder() {
+        // Create the required data
+        final byte[] validShadowCellQualifier =
+                com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier, SHADOW_CELL_SUFFIX);
+
+        final byte[] qualifier2 = Bytes.toBytes("test-qual2");
+        final byte[] validShadowCellQualifier2 =
+                com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier2, SHADOW_CELL_SUFFIX);
+
+        final byte[] qualifier3 = Bytes.toBytes("test-qual3");
+        final byte[] validShadowCellQualifier3 =
+                com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier3, SHADOW_CELL_SUFFIX);
+
+        final byte[] qualifier4 = Bytes.toBytes("test-qual4");
+        final byte[] qualifier5 = Bytes.toBytes("test-qual5");
+        final byte[] validShadowCellQualifier5 =
+                com.google.common.primitives.Bytes.concat(SHADOW_CELL_PREFIX, qualifier5, SHADOW_CELL_SUFFIX);
+
+
+        Cell cell1 = new KeyValue(row, family, qualifier, 1, Bytes.toBytes("value")); // Default type is Put
+        Cell shadowCell1 = new KeyValue(row, family, validShadowCellQualifier, 1, Bytes.toBytes("sc-value"));
+
+        Cell cell2 = new KeyValue(row, family, qualifier2, 1, Bytes.toBytes("value2"));
+        Cell shadowCell2 = new KeyValue(row, family, validShadowCellQualifier2, 1, Bytes.toBytes("sc-value2"));
+
+        Cell cell3 = new KeyValue(row, family, qualifier3, 1, Bytes.toBytes("value3"));
+        Cell shadowCell3 = new KeyValue(row, family, validShadowCellQualifier3, 1, Bytes.toBytes("sc-value2"));
+
+        Cell cell4 = new KeyValue(row, family, qualifier4, 1, Bytes.toBytes("value4"));
+
+        Cell shadowCell5 = new KeyValue(row, family, validShadowCellQualifier5, 1, Bytes.toBytes("sc-value2"));
+
+        List<Cell> scanList = new ArrayList<>();
+        scanList.add(shadowCell5);
+        scanList.add(cell3);
+        scanList.add(cell1);
+        scanList.add(shadowCell1);
+        scanList.add(shadowCell2);
+        scanList.add(cell4);
+        scanList.add(cell2);
+        scanList.add(shadowCell3);
+        scanList.add(shadowCell5);
+
+        SortedMap<Cell, Optional<Cell>> cellsToShadowCells = CellUtils.mapCellsToShadowCells(scanList);
+        assertEquals(cellsToShadowCells.get(cell1).get(), shadowCell1);
+        assertEquals(cellsToShadowCells.get(cell2).get(), shadowCell2);
+        assertEquals(cellsToShadowCells.get(cell3).get(), shadowCell3);
+        assertFalse(cellsToShadowCells.get(cell4).isPresent());
+    }
+
 }
diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
index 019ab74..98cbf4b 100644
--- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
+++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
@@ -184,6 +184,24 @@ public final class CellUtils {
         return qualLength;
     }
 
+
+    /**
+     * Returns the qualifier length removing the shadow cell suffix and prefix. In case that que suffix is not found,
+     * just returns the length of the qualifier passed.
+     * @param qualifier the qualifier to remove the suffix from
+     * @param qualOffset the offset where the qualifier starts
+     * @param qualLength the qualifier length
+     * @return the qualifier length without the suffix
+     */
+    public static int qualifierOffsetFromShadowCellQualifier(byte[] qualifier, int qualOffset, int qualLength) {
+
+        if (startsWith(qualifier, qualOffset, qualLength, SHADOW_CELL_PREFIX)) {
+            return qualOffset + SHADOW_CELL_PREFIX.length;
+        }
+        return qualOffset;
+    }
+
+
     /**
      * Complement to matchingQualifier() methods in HBase's CellUtil.class
      * @param left the cell to compare the qualifier
@@ -307,6 +325,7 @@ public final class CellUtils {
                 = new TreeMap<Cell, Optional<Cell>>(HBaseShims.cellComparatorInstance());
 
         Map<CellId, Cell> cellIdToCellMap = new HashMap<CellId, Cell>();
+        Map<CellId, Cell> cellIdToSCCellMap = new HashMap<CellId, Cell>();
         for (Cell cell : cells) {
             if (!isShadowCell(cell)) {
                 CellId key = new CellId(cell, false);
@@ -328,7 +347,12 @@ public final class CellUtils {
                     }
                 } else {
                     cellIdToCellMap.put(key, cell);
-                    cellToShadowCellMap.put(cell, Optional.<Cell>absent());
+                    Cell sc = cellIdToSCCellMap.get(key);
+                    if (sc != null) {
+                        cellToShadowCellMap.put(cell, Optional.of(sc));
+                    } else {
+                        cellToShadowCellMap.put(cell, Optional.<Cell>absent());
+                    }
                 }
             } else {
                 CellId key = new CellId(cell, true);
@@ -336,7 +360,7 @@ public final class CellUtils {
                     Cell originalCell = cellIdToCellMap.get(key);
                     cellToShadowCellMap.put(originalCell, Optional.of(cell));
                 } else {
-                    LOG.trace("Map does not contain key {}", key);
+                    cellIdToSCCellMap.put(key, cell);
                 }
             }
         }
@@ -386,23 +410,29 @@ public final class CellUtils {
             }
 
             // Qualifier comparison
+            int qualifierLength = cell.getQualifierLength();
+            int qualifierOffset = cell.getQualifierOffset();
+            int otherQualifierLength = otherCell.getQualifierLength();
+            int otherQualifierOffset = otherCell.getQualifierOffset();
+
             if (isShadowCell()) {
-                int qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(),
+                qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(),
                         cell.getQualifierOffset(),
                         cell.getQualifierLength());
-                int qualifierOffset = cell.getQualifierOffset();
-                if (startsWith(cell.getQualifierArray(), cell.getQualifierOffset(),
-                        cell.getQualifierLength(), SHADOW_CELL_PREFIX)) {
-                    qualifierOffset = qualifierOffset + SHADOW_CELL_PREFIX.length;
-                }
-                if (!matchingQualifier(otherCell,
-                        cell.getQualifierArray(), qualifierOffset, qualifierLength)) {
-                    return false;
-                }
-            } else {
-                if (!CellUtil.matchingQualifier(otherCell, cell)) {
-                    return false;
-                }
+                qualifierOffset = qualifierOffsetFromShadowCellQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
+                        cell.getQualifierLength());
+            }
+            if (otherCellId.isShadowCell()) {
+                otherQualifierLength = qualifierLengthFromShadowCellQualifier(otherCell.getQualifierArray(),
+                        otherCell.getQualifierOffset(),
+                        otherCell.getQualifierLength());
+                otherQualifierOffset = qualifierOffsetFromShadowCellQualifier(otherCell.getQualifierArray(), otherCell.getQualifierOffset(),
+                        otherCell.getQualifierLength());
+            }
+
+            if (!Bytes.equals(cell.getQualifierArray(), qualifierOffset, qualifierLength,
+                    otherCell.getQualifierArray(), otherQualifierOffset, otherQualifierLength)) {
+                return false;
             }
 
             // Timestamp comparison
@@ -426,6 +456,8 @@ public final class CellUtils {
                     qualifierOffset = qualifierOffset + SHADOW_CELL_PREFIX.length;
                 }
             }
+            String a = Bytes.toString(cell.getQualifierArray(), qualifierOffset, qualifierLength);
+
             hasher.putBytes(cell.getQualifierArray(),qualifierOffset , qualifierLength);
             hasher.putLong(cell.getTimestamp());
             return hasher.hash().asInt();
@@ -443,8 +475,8 @@ public final class CellUtils {
                 int qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(),
                         cell.getQualifierOffset(),
                         cell.getQualifierLength());
-                helper.add("qualifier whithout shadow cell suffix",
-                        Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset() + 1, qualifierLength));
+                byte[] b = removeShadowCellSuffixPrefix(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                helper.add("qualifier whithout shadow cell suffix", Bytes.toString(b));
             }
             helper.add("ts", cell.getTimestamp());
             return helper.toString();
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 4ba385d..ea14a8e 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -82,7 +82,6 @@ public class CompactorScanner implements InternalScanner {
 
     @Override
     public boolean next(List<Cell> results) throws IOException {
-        //TODO YONIGO - why-1 we get exceptions
         return next(results, -1);
     }
 
@@ -166,7 +165,7 @@ public class CompactorScanner implements InternalScanner {
         }
 
         // Chomp current row worth values up to the limit
-        if (currentRowWorthValues.size() <= limit) {
+        if (currentRowWorthValues.size() <= limit || limit == -1) {
             result.addAll(currentRowWorthValues);
             currentRowWorthValues.clear();
         } else {
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index 197ef3f..8b77b7c 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -207,6 +207,67 @@ public class TestCompaction {
                 .build();
     }
 
+
+    @Test
+    public void testShadowCellsAboveLWMSurviveCompaction() throws Exception {
+        String TEST_TABLE = "testShadowCellsAboveLWMSurviveCompaction";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable txTable = new TTable(connection, TEST_TABLE);
+
+        byte[] rowId = Bytes.toBytes("row");
+
+        // Create 3 transactions modifying the same cell in a particular row
+        HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+        Put put1 = new Put(rowId);
+        put1.addColumn(fam, qual, Bytes.toBytes("testValue 1"));
+        txTable.put(tx1, put1);
+        tm.commit(tx1);
+
+        HBaseTransaction tx2 = (HBaseTransaction) tm.begin();
+        Put put2 = new Put(rowId);
+        put2.addColumn(fam, qual, Bytes.toBytes("testValue 2"));
+        txTable.put(tx2, put2);
+        tm.commit(tx2);
+
+        HBaseTransaction tx3 = (HBaseTransaction) tm.begin();
+        Put put3 = new Put(rowId);
+        put3.addColumn(fam, qual, Bytes.toBytes("testValue 3"));
+        txTable.put(tx3, put3);
+        tm.commit(tx3);
+
+        // Before compaction, the three timestamped values for the cell should be there
+        TTableCellGetterAdapter getter = new TTableCellGetterAdapter(txTable);
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
+                "Put cell of Tx1 should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
+                "Put shadow cell of Tx1 should be there");
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
+                "Put cell of Tx2 cell should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
+                "Put shadow cell of Tx2 should be there");
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
+                "Put cell of Tx3 cell should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
+                "Put shadow cell of Tx3 should be there");
+
+        // Compact
+        compactWithLWM(0, TEST_TABLE);
+
+        // After compaction, the three timestamped values for the cell should be there
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
+                "Put cell of Tx1 should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx1.getStartTimestamp(), getter),
+                "Put shadow cell of Tx1 should be there");
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
+                "Put cell of Tx2 cell should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx2.getStartTimestamp(), getter),
+                "Put shadow cell of Tx2 should be there");
+        assertTrue(CellUtils.hasCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
+                "Put cell of Tx3 cell should be there");
+        assertTrue(CellUtils.hasShadowCell(rowId, fam, qual, tx3.getStartTimestamp(), getter),
+                "Put shadow cell of Tx3 should be there");
+    }
+
     @Test(timeOut = 60_000)
     public void testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction() throws Throwable {
         String TEST_TABLE = "testStandardTXsWithShadowCellsAndWithSTBelowAndAboveLWMArePresevedAfterCompaction";


[incubator-omid] 01/02: [OMID-134] - transactionManager is thread safe. All HBase Table and Connectoins are close correctly. Create new Table from connection only when needed

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit 3a22dd6f450c1ae0d75b42659aef921b4ade4ba2
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Feb 26 10:31:30 2019 +0200

    [OMID-134] - transactionManager is thread safe.
    All HBase Table and Connectoins are close correctly.
    Create new Table from connection only when needed
---
 .../apache/omid/benchmarks/tso/RawTxRunner.java    |  3 +-
 .../org/apache/omid/committable/CommitTable.java   |  4 +-
 .../omid/committable/InMemoryCommitTable.java      |  8 ----
 .../apache/omid/committable/NullCommitTable.java   |  8 ----
 .../omid/committable/NullCommitTableTest.java      | 50 +++++++++++-----------
 .../transaction/AttributeSetSnapshotFilter.java    |  4 ++
 .../omid/transaction/HBaseSyncPostCommitter.java   | 23 ++++++----
 .../omid/transaction/HBaseTransactionManager.java  | 32 +++++++++-----
 .../omid/transaction/HTableAccessWrapper.java      |  5 +++
 .../apache/omid/transaction/SnapshotFilter.java    |  2 +-
 .../omid/transaction/SnapshotFilterImpl.java       |  9 ++--
 .../java/org/apache/omid/transaction/TTable.java   |  6 +++
 .../omid/transaction/TableAccessWrapper.java       |  2 +-
 .../transaction/TestAsynchronousPostCommitter.java |  6 +--
 .../org/apache/omid/transaction/TestFilters.java   |  4 +-
 .../transaction/TestHBaseTransactionClient.java    |  8 ++--
 .../apache/omid/transaction/TestShadowCells.java   |  8 ++--
 .../omid/committable/hbase/HBaseCommitTable.java   | 45 ++++++++-----------
 .../hbase/regionserver/RegionAccessWrapper.java    |  4 ++
 .../apache/omid/transaction/CompactorScanner.java  |  6 +--
 .../org/apache/omid/transaction/CompactorUtil.java | 20 ++++-----
 .../org/apache/omid/transaction/OmidCompactor.java | 33 ++++----------
 .../omid/transaction/OmidSnapshotFilter.java       | 12 +++---
 .../apache/omid/transaction/TestCompaction.java    | 13 +++---
 .../omid/transaction/TestCompactorScanner.java     |  4 +-
 .../omid/transaction/TestSnapshotFilter.java       |  2 +-
 .../omid/transaction/TestSnapshotFilterLL.java     |  2 +-
 .../timestamp/storage/HBaseTimestampStorage.java   | 11 ++++-
 .../transaction/AbstractTransactionManager.java    |  7 +--
 29 files changed, 170 insertions(+), 171 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index 511ba05..27ac437 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -175,12 +175,11 @@ class RawTxRunner implements Runnable {
             if (!wasSuccess) {
                 callbackExec.shutdownNow();
             }
-            commitTableClient.close();
             tsoClient.close().get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             // ignore
-        } catch (ExecutionException | IOException e) {
+        } catch (ExecutionException e) {
             // ignore
         } finally {
             LOG.info("TxRunner {} finished", txRunnerId);
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index 8f578c6..9b20305 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -31,7 +31,7 @@ public interface CommitTable {
 
     Client getClient() throws IOException;
 
-    interface Writer extends Closeable {
+    interface Writer{
 
         void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
 
@@ -53,7 +53,7 @@ public interface CommitTable {
         boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
     }
 
-    interface Client extends Closeable {
+    interface Client {
 
         /**
          * Checks whether a transaction commit data is inside the commit table The function also checks whether the
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 14a4f76..9462c3a 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -72,10 +72,6 @@ public class InMemoryCommitTable implements CommitTable {
             // required to make sure the entry was not invalidated.
             return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
         }
-
-        @Override
-        public void close() {
-        }
     }
 
     public class Client implements CommitTable.Client {
@@ -138,10 +134,6 @@ public class InMemoryCommitTable implements CommitTable {
             f.set(false);
             return f;
         }
-
-        @Override
-        public void close() {
-        }
     }
 
     public int countElements() {
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index 176d4d9..70e45da 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -59,11 +59,6 @@ public class NullCommitTable implements CommitTable {
         public void flush() throws IOException {
             // noop
         }
-
-        @Override
-        public void close() {
-        }
-
     }
 
     public static class Client implements CommitTable.Client {
@@ -89,8 +84,5 @@ public class NullCommitTable implements CommitTable {
             throw new UnsupportedOperationException();
         }
 
-        @Override
-        public void close() {
-        }
     }
 }
diff --git a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
index 0d539d8..efe5c29 100644
--- a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
+++ b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
@@ -35,36 +35,34 @@ public class NullCommitTableTest {
 
         CommitTable commitTable = new NullCommitTable();
 
-        try (CommitTable.Client commitTableClient = commitTable.getClient();
-             CommitTable.Writer commitTableWriter = commitTable.getWriter()) {
+        CommitTable.Client commitTableClient = commitTable.getClient();
+        CommitTable.Writer commitTableWriter = commitTable.getWriter();
 
-            // Test client
-            try {
-                commitTableClient.readLowWatermark().get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        // Test client
+        try {
+            commitTableClient.readLowWatermark().get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            try {
-                commitTableClient.getCommitTimestamp(TEST_ST).get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        try {
+            commitTableClient.getCommitTimestamp(TEST_ST).get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            try {
-                commitTableClient.tryInvalidateTransaction(TEST_ST).get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        try {
+            commitTableClient.tryInvalidateTransaction(TEST_ST).get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
+        assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
 
-            // Test writer
-            commitTableWriter.updateLowWatermark(TEST_LWM);
-            commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
-            commitTableWriter.clearWriteBuffer();
-            commitTableWriter.flush();
-        }
+        // Test writer
+        commitTableWriter.updateLowWatermark(TEST_LWM);
+        commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
+        commitTableWriter.clearWriteBuffer();
+        commitTableWriter.flush();
     }
-
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index 6fdcd44..856d247 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -57,4 +57,8 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
         scan.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
         return table.getScanner(scan);
     }
+
+    public void close() throws IOException {
+        table.close();
+    }
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index c9fa5e5..68dc8d3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
@@ -51,29 +53,34 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
     private final Timer commitTableUpdateTimer;
     private final Timer shadowCellsUpdateTimer;
     private static final int MAX_BATCH_SIZE=1000;
+    private final Connection connection;
 
-
-    public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
+    public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
+                                  Connection connection) {
         this.metrics = metrics;
         this.commitTableClient = commitTableClient;
 
         this.commitTableUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "commitTableUpdate", "latency"));
         this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
+        this.connection = connection;
     }
 
-    private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
-        table.batch(mutations, new Object[mutations.size()]);
+    private void flushMutations(TableName tableName, List<Mutation> mutations) throws IOException, InterruptedException {
+        try (Table table = connection.getTable(tableName)){
+            table.batch(mutations, new Object[mutations.size()]);
+        }
+
     }
 
     private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
-                               Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
+                               Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
         Put put = new Put(cell.getRow());
         put.addColumn(cell.getFamily(),
                 CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
                 cell.getTimestamp(),
                 Bytes.toBytes(tx.getCommitTimestamp()));
 
-        Table table = cell.getTable().getHTable();
+        TableName table = cell.getTable().getHTable().getName();
         List<Mutation> tableMutations = mutations.get(table);
         if (tableMutations == null) {
             ArrayList<Mutation> newList = new ArrayList<>();
@@ -97,7 +104,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
         shadowCellsUpdateTimer.start();
         try {
-            Map<Table,List<Mutation>> mutations = new HashMap<>();
+            Map<TableName,List<Mutation>> mutations = new HashMap<>();
             // Add shadow cells
             for (HBaseCellId cell : tx.getWriteSet()) {
                 addShadowCell(cell, tx, updateSCFuture, mutations);
@@ -107,7 +114,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
                 addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
-            for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+            for (Map.Entry<TableName,List<Mutation>> entry: mutations.entrySet()) {
                 flushMutations(entry.getKey(), entry.getValue());
             }
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index c66b9b2..5620be3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
@@ -46,6 +47,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
+    private final Connection connection;
 
     private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
 
@@ -111,9 +113,11 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
 
         public HBaseTransactionManager build() throws IOException, InterruptedException {
 
-            CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
-            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
-            PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
+            Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
+
+            CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
+            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
+            PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
             TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
 
             return new HBaseTransactionManager(hbaseOmidClientConf,
@@ -121,7 +125,8 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                                tsoClient,
                                                commitTableClient,
                                                commitTableWriter,
-                                               new HBaseTransactionFactory());
+                                               new HBaseTransactionFactory(),
+                                               connection);
         }
 
         private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
@@ -129,25 +134,25 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
         }
 
 
-        private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
+        private Optional<CommitTable.Client> buildCommitTableClient(Connection connection) throws IOException {
             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
-            CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
+            CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
             return Optional.of(commitTable.getClient());
         }
 
-        private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
+        private Optional<CommitTable.Writer> buildCommitTableWriter(Connection connection) throws IOException {
             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
-            CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+            CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
             return Optional.of(commitTable.getWriter());
         }
 
-        private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
+        private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient, Connection connection) {
 
             PostCommitActions postCommitter;
             PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
-                                                                             commitTableClient);
+                                                                             commitTableClient, connection);
             switch(hbaseOmidClientConf.getPostCommitMode()) {
                 case ASYNC:
                     ListeningExecutorService postCommitExecutor =
@@ -175,7 +180,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                     TSOProtocol tsoClient,
                                     CommitTable.Client commitTableClient,
                                     CommitTable.Writer commitTableWriter,
-                                    HBaseTransactionFactory hBaseTransactionFactory) {
+                                    HBaseTransactionFactory hBaseTransactionFactory, Connection connection) {
 
         super(hBaseOmidClientConfiguration.getMetrics(),
                 postCommitter,
@@ -183,11 +188,16 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                 commitTableClient,
                 commitTableWriter,
                 hBaseTransactionFactory);
+        this.connection = connection;
     }
 
     // ----------------------------------------------------------------------------------------------------------------
     // AbstractTransactionManager overwritten methods
     // ----------------------------------------------------------------------------------------------------------------
+    @Override
+    public void closeResources() throws IOException {
+        connection.close();
+    }
 
     @Override
     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
index f48fa55..0114ca2 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
@@ -60,4 +60,9 @@ public class HTableAccessWrapper implements TableAccessWrapper {
         return readTable.getScanner(scan);
     }
 
+    @Override
+    public void close() throws Exception {
+        writeTable.close();
+        readTable.close();
+    }
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
index 370ac01..9372868 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 
 
-public interface SnapshotFilter {
+public interface SnapshotFilter extends AutoCloseable{
     
     Result get(Get get, HBaseTransaction transaction) throws IOException;
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index d095858..569f1bd 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -63,10 +63,6 @@ public class SnapshotFilterImpl implements SnapshotFilter {
 
     private TableAccessWrapper tableAccessWrapper;
 
-    public void closeCommitTableClient() throws IOException {
-        commitTableClient.close();
-    }
-
     private CommitTable.Client commitTableClient;
 
     public TableAccessWrapper getTableAccessWrapper() {
@@ -598,6 +594,11 @@ public class SnapshotFilterImpl implements SnapshotFilter {
                 .asList();
     }
 
+    @Override
+    public void close() throws Exception {
+        tableAccessWrapper.close();
+    }
+
 
     public class TransactionalClientScanner implements ResultScanner {
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 869a013..e52c405 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -163,6 +163,12 @@ public class TTable implements Closeable {
     @Override
     public void close() throws IOException {
         table.close();
+        try {
+            snapshotFilter.close();
+        } catch (Exception e) {
+            LOG.warn("Failed to close TTable resources.");
+            e.printStackTrace();
+        }
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
index 8f7f6ac..534dc94 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
@@ -29,7 +29,7 @@ import java.util.List;
 
 
 //This interface is used to wrap the HTableInterface and Region object when doing client and server side filtering accordingly.
-public interface TableAccessWrapper {
+public interface TableAccessWrapper extends AutoCloseable{
 
     Result[] get(List<Get> get) throws IOException;
     Result get(Get get) throws IOException;
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
index 5979c80..e2c9933 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
@@ -67,7 +67,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -185,7 +185,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -264,7 +264,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
index 4678110..d375084 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -78,7 +78,7 @@ public class TestFilters extends OmidTestBase {
 
         TTable table = new TTable(connection, TEST_TABLE);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
                 .commitTableWriter(getCommitTable(context).getWriter())
@@ -127,7 +127,7 @@ public class TestFilters extends OmidTestBase {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         TTable table = new TTable(connection, TEST_TABLE);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
                 .commitTableWriter(getCommitTable(context).getWriter())
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
index fb5efdf..45b5ce5 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -94,7 +94,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     @Test(timeOut = 30_000)
     public void testCrashAfterCommit(ITestContext context) throws Exception {
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -134,7 +134,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         final long NON_EXISTING_CELL_TS = 1000L;
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -257,7 +257,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -374,7 +374,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
 
         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
 
         // The following line emulates a crash after commit that is observed in (*) below
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index df274ae..8cf51a2 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -141,7 +141,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -189,7 +189,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableWriter(getCommitTable(context).getWriter())
@@ -250,7 +250,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -335,7 +335,7 @@ public class TestShadowCells extends OmidTestBase {
 
         final AtomicBoolean readFailed = new AtomicBoolean(false);
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
 
         doAnswer(new Answer<ListenableFuture<Void>>() {
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index b83f1a3..2deb8ee 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -95,13 +95,13 @@ public class HBaseCommitTable implements CommitTable {
     private class HBaseWriter implements Writer {
 
         private static final long INITIAL_LWM_VALUE = -1L;
-        final Table table;
+
         // Our own buffer for operations
         final List<Put> writeBuffer = new LinkedList<>();
         volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
 
-        HBaseWriter() throws IOException {
-            table = hbaseConnection.getTable(TableName.valueOf(tableName));
+        HBaseWriter() {
+
         }
 
         @Override
@@ -120,7 +120,8 @@ public class HBaseCommitTable implements CommitTable {
 
         @Override
         public void flush() throws IOException {
-            try {
+
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 addLowWatermarkToStoreToWriteBuffer();
                 table.put(writeBuffer);
                 writeBuffer.clear();
@@ -137,18 +138,15 @@ public class HBaseCommitTable implements CommitTable {
 
         @Override
         public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
-            assert (startTimestamp < commitTimestamp);
-            byte[] transactionRow = startTimestampToKey(startTimestamp);
-            Put put = new Put(transactionRow, startTimestamp);
-            byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
-            put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
-            return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
-        }
+            try (Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
+                assert (startTimestamp < commitTimestamp);
+                byte[] transactionRow = startTimestampToKey(startTimestamp);
+                Put put = new Put(transactionRow, startTimestamp);
+                byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
+                put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+                return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
+            }
 
-        @Override
-        public void close() throws IOException {
-            clearWriteBuffer();
-            table.close();
         }
 
         private void addLowWatermarkToStoreToWriteBuffer() {
@@ -164,17 +162,15 @@ public class HBaseCommitTable implements CommitTable {
 
     class HBaseClient implements Client{
 
-        final Table table;
+        HBaseClient(){
 
-        HBaseClient() throws IOException {
-            table = hbaseConnection.getTable(TableName.valueOf(tableName));
         }
 
         @Override
         public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
 
             SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 Get get = new Get(startTimestampToKey(startTimestamp));
                 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
                 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
@@ -206,7 +202,7 @@ public class HBaseCommitTable implements CommitTable {
         @Override
         public ListenableFuture<Long> readLowWatermark() {
             SettableFuture<Long> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 Get get = new Get(LOW_WATERMARK_ROW);
                 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
                 Result result = table.get(get);
@@ -238,7 +234,7 @@ public class HBaseCommitTable implements CommitTable {
 
             Delete delete = new Delete(key, startTimestamp);
 
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 table.delete(delete);
             } catch (IOException e) {
                 SettableFuture<Void> f = SettableFuture.create();
@@ -253,7 +249,7 @@ public class HBaseCommitTable implements CommitTable {
         @Override
         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
             SettableFuture<Boolean> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 byte[] row = startTimestampToKey(startTimestamp);
                 Put invalidationPut = new Put(row, startTimestamp);
                 invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
@@ -272,11 +268,6 @@ public class HBaseCommitTable implements CommitTable {
             return f;
         }
 
-        @Override
-        public synchronized void close() throws IOException {
-            table.close();
-        }
-
         private boolean containsATimestamp(Result result) {
             return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
         }
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
index 4786eda..1006bc6 100644
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
@@ -63,4 +63,8 @@ public class RegionAccessWrapper implements TableAccessWrapper {
         return null;
     }
 
+    @Override
+    public void close() throws Exception {
+
+    }
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index cf93163..4ba385d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -54,7 +54,7 @@ public class CompactorScanner implements InternalScanner {
     private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
     private final InternalScanner internalScanner;
     private final CommitTable.Client commitTableClient;
-    private final Queue<CommitTable.Client> commitTableClientQueue;
+
     private final boolean isMajorCompaction;
     private final boolean retainNonTransactionallyDeletedCells;
     private final long lowWatermark;
@@ -67,12 +67,10 @@ public class CompactorScanner implements InternalScanner {
     public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
                             InternalScanner internalScanner,
                             Client commitTableClient,
-                            Queue<CommitTable.Client> commitTableClientQueue,
                             boolean isMajorCompaction,
                             boolean preserveNonTransactionallyDeletedCells) throws IOException {
         this.internalScanner = internalScanner;
         this.commitTableClient = commitTableClient;
-        this.commitTableClientQueue = commitTableClientQueue;
         this.isMajorCompaction = isMajorCompaction;
         this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
         this.lowWatermark = getLowWatermarkFromCommitTable();
@@ -84,6 +82,7 @@ public class CompactorScanner implements InternalScanner {
 
     @Override
     public boolean next(List<Cell> results) throws IOException {
+        //TODO YONIGO - why-1 we get exceptions
         return next(results, -1);
     }
 
@@ -182,7 +181,6 @@ public class CompactorScanner implements InternalScanner {
     @Override
     public void close() throws IOException {
         internalScanner.close();
-        commitTableClientQueue.add(commitTableClient);
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
index f95191c..41bf13c 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
@@ -85,16 +85,16 @@ public class CompactorUtil {
         HBaseLogin.loginIfNeeded(cmdline.loginFlags);
 
         Configuration conf = HBaseConfiguration.create();
-        Connection conn = ConnectionFactory.createConnection(conf);
-
-        if (cmdline.enable) {
-            enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
-                    Bytes.toBytes(cmdline.columnFamily));
-        } else if (cmdline.disable) {
-            disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
-                    Bytes.toBytes(cmdline.columnFamily));
-        } else {
-            System.err.println("Must specify enable or disable");
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+            if (cmdline.enable) {
+                enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+                        Bytes.toBytes(cmdline.columnFamily));
+            } else if (cmdline.disable) {
+                disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+                        Bytes.toBytes(cmdline.columnFamily));
+            } else {
+                System.err.println("Must specify enable or disable");
+            }
         }
     }
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index f8ed6b7..57f82b0 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -20,6 +20,7 @@ package org.apache.omid.transaction;
 import com.google.common.annotations.VisibleForTesting;
 
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -64,8 +65,9 @@ public class OmidCompactor extends BaseRegionObserver {
 
     private HBaseCommitTableConfig commitTableConf = null;
     private RegionCoprocessorEnvironment env = null;
+
     @VisibleForTesting
-    Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
+    CommitTable.Client commitTableClient;
 
     // When compacting, if a cell which has been marked by HBase as Delete or
     // Delete Family (that is, non-transactionally deleted), we allow the user
@@ -73,7 +75,8 @@ public class OmidCompactor extends BaseRegionObserver {
     // If retained, the deleted cell will appear after a minor compaction, but
     // will be deleted anyways after a major one
     private boolean retainNonTransactionallyDeletedCells;
-    private CommitTable commitTable;
+
+    private Connection connection;
 
     public OmidCompactor() {
         this(false);
@@ -92,10 +95,10 @@ public class OmidCompactor extends BaseRegionObserver {
         if (commitTableName != null) {
             commitTableConf.setTableName(commitTableName);
         }
-        commitTable = new HBaseCommitTable(RegionConnectionFactory
-                .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION,
-                        (RegionCoprocessorEnvironment) env)
-                , commitTableConf);
+
+        connection = RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
+        commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
         retainNonTransactionallyDeletedCells =
                 env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
                         HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
@@ -105,11 +108,6 @@ public class OmidCompactor extends BaseRegionObserver {
     @Override
     public void stop(CoprocessorEnvironment e) throws IOException {
         LOG.info("Stopping compactor coprocessor");
-        if (commitTableClientQueue != null) {
-            for (CommitTable.Client commitTableClient : commitTableClientQueue) {
-                commitTableClient.close();
-            }
-        }
         LOG.info("Compactor coprocessor stopped");
     }
 
@@ -135,15 +133,10 @@ public class OmidCompactor extends BaseRegionObserver {
             if (!omidCompactable) {
                 return scanner;
             } else {
-                CommitTable.Client commitTableClient = commitTableClientQueue.poll();
-                if (commitTableClient == null) {
-                    commitTableClient = initAndGetCommitTableClient();
-                }
                 boolean isMajorCompaction = request.isMajor();
                 return new CompactorScanner(env,
                         scanner,
                         commitTableClient,
-                        commitTableClientQueue,
                         isMajorCompaction,
                         retainNonTransactionallyDeletedCells);
             }
@@ -153,12 +146,4 @@ public class OmidCompactor extends BaseRegionObserver {
             throw new DoNotRetryIOException(e);
         }
     }
-
-    private CommitTable.Client initAndGetCommitTableClient() throws IOException {
-        LOG.info("Trying to get the commit table client");
-        CommitTable.Client commitTableClient = commitTable.getClient();
-        LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
-        return commitTableClient;
-    }
-
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index eb5d50f..7ee742d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -19,6 +19,7 @@ package org.apache.omid.transaction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Scan;
 
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -66,7 +67,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
     private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
     private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
     private CommitTable.Client inMemoryCommitTable = null;
-    private CommitTable commitTable;
+    private CommitTable.Client commitTableClient;
+    private Connection connection;
 
     public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
         LOG.info("Compactor coprocessor initialized");
@@ -86,14 +88,15 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         if (commitTableName != null) {
             commitTableConf.setTableName(commitTableName);
         }
+        connection = RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env);
+        commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
         LOG.info("Snapshot filter started");
-        commitTable = new HBaseCommitTable(RegionConnectionFactory
-                .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env),
-                commitTableConf);
     }
 
     @Override
     public void stop(CoprocessorEnvironment e) throws IOException {
+        LOG.info("stopping Snapshot filter");
         LOG.info("Snapshot filter stopped");
     }
 
@@ -181,7 +184,6 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         if (inMemoryCommitTable != null) {
             return inMemoryCommitTable;
         }
-        CommitTable.Client commitTableClient = commitTable.getClient();
         return commitTableClient;
     }
 
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index a4bf65e..197ef3f 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -200,7 +200,7 @@ public class TestCompaction {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -241,7 +241,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(fakeAssignedLowWatermark);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE));
 
@@ -292,7 +292,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(Long.MAX_VALUE);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
 
         LOG.info("Flushing table {}", TEST_TABLE);
         admin.flush(TableName.valueOf(TEST_TABLE));
@@ -361,7 +361,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE));
 
@@ -422,8 +422,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.setException(new IOException("Unable to read"));
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
-
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
 
@@ -1167,7 +1166,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(lwm);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
     }
 
     private void compactEverything(String tableName) throws Exception {
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
index 8a217b3..eca9714 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
@@ -64,8 +64,7 @@ public class TestCompactorScanner {
         ObserverContext<RegionCoprocessorEnvironment> ctx = mock(ObserverContext.class);
         InternalScanner internalScanner = mock(InternalScanner.class);
         CommitTable.Client ctClient = mock(CommitTable.Client.class);
-        @SuppressWarnings("unchecked")
-        Queue<Client> queue = mock(Queue.class);
+
         RegionCoprocessorEnvironment rce = mock(RegionCoprocessorEnvironment.class);
         HRegion hRegion = mock(HRegion.class);
         HRegionInfo hRegionInfo = mock(HRegionInfo.class);
@@ -82,7 +81,6 @@ public class TestCompactorScanner {
         try (CompactorScanner scanner = spy(new CompactorScanner(ctx,
                 internalScanner,
                 ctClient,
-                queue,
                 false,
                 retainOption))) {
 
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 4c5cc50..2cfc77e 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -207,7 +207,7 @@ public class TestSnapshotFilter {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
index 1bb5691..3496bde 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
@@ -185,7 +185,7 @@ public class TestSnapshotFilterLL {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
diff --git a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
index a33c9dd..6ff23aa 100644
--- a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
+++ b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
@@ -51,11 +51,12 @@ public class HBaseTimestampStorage implements TimestampStorage {
 
     private final Table table;
     private final byte[] cfName;
+    private final Connection connection;
 
     @Inject
     public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
-        Connection conn = ConnectionFactory.createConnection(hbaseConfig);
-        this.table = conn.getTable(TableName.valueOf(config.getTableName()));
+        connection = ConnectionFactory.createConnection(hbaseConfig);
+        this.table = connection.getTable(TableName.valueOf(config.getTableName()));
         this.cfName = config.getFamilyName().getBytes(UTF_8);
     }
 
@@ -91,4 +92,10 @@ public class HBaseTimestampStorage implements TimestampStorage {
 
     }
 
+
+    public void close() throws IOException {
+        //TODO this is never called
+        table.close();
+        connection.close();
+    }
 }
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 5075a7f..0eca91a 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -314,15 +314,16 @@ public abstract class AbstractTransactionManager implements TransactionManager {
      */
     public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
 
+
+    abstract void closeResources() throws IOException;
+
     /**
      * @see java.io.Closeable#close()
      */
     @Override
     public final void close() throws IOException {
-
         tsoClient.close();
-        commitTableClient.close();
-
+        closeResources();
     }
 
     // ----------------------------------------------------------------------------------------------------------------