You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/03/03 06:31:53 UTC

[incubator-omid] 01/02: [OMID-134] - transactionManager is thread safe. All HBase Table and Connectoins are close correctly. Create new Table from connection only when needed

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git

commit 3a22dd6f450c1ae0d75b42659aef921b4ade4ba2
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Tue Feb 26 10:31:30 2019 +0200

    [OMID-134] - transactionManager is thread safe.
    All HBase Table and Connectoins are close correctly.
    Create new Table from connection only when needed
---
 .../apache/omid/benchmarks/tso/RawTxRunner.java    |  3 +-
 .../org/apache/omid/committable/CommitTable.java   |  4 +-
 .../omid/committable/InMemoryCommitTable.java      |  8 ----
 .../apache/omid/committable/NullCommitTable.java   |  8 ----
 .../omid/committable/NullCommitTableTest.java      | 50 +++++++++++-----------
 .../transaction/AttributeSetSnapshotFilter.java    |  4 ++
 .../omid/transaction/HBaseSyncPostCommitter.java   | 23 ++++++----
 .../omid/transaction/HBaseTransactionManager.java  | 32 +++++++++-----
 .../omid/transaction/HTableAccessWrapper.java      |  5 +++
 .../apache/omid/transaction/SnapshotFilter.java    |  2 +-
 .../omid/transaction/SnapshotFilterImpl.java       |  9 ++--
 .../java/org/apache/omid/transaction/TTable.java   |  6 +++
 .../omid/transaction/TableAccessWrapper.java       |  2 +-
 .../transaction/TestAsynchronousPostCommitter.java |  6 +--
 .../org/apache/omid/transaction/TestFilters.java   |  4 +-
 .../transaction/TestHBaseTransactionClient.java    |  8 ++--
 .../apache/omid/transaction/TestShadowCells.java   |  8 ++--
 .../omid/committable/hbase/HBaseCommitTable.java   | 45 ++++++++-----------
 .../hbase/regionserver/RegionAccessWrapper.java    |  4 ++
 .../apache/omid/transaction/CompactorScanner.java  |  6 +--
 .../org/apache/omid/transaction/CompactorUtil.java | 20 ++++-----
 .../org/apache/omid/transaction/OmidCompactor.java | 33 ++++----------
 .../omid/transaction/OmidSnapshotFilter.java       | 12 +++---
 .../apache/omid/transaction/TestCompaction.java    | 13 +++---
 .../omid/transaction/TestCompactorScanner.java     |  4 +-
 .../omid/transaction/TestSnapshotFilter.java       |  2 +-
 .../omid/transaction/TestSnapshotFilterLL.java     |  2 +-
 .../timestamp/storage/HBaseTimestampStorage.java   | 11 ++++-
 .../transaction/AbstractTransactionManager.java    |  7 +--
 29 files changed, 170 insertions(+), 171 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index 511ba05..27ac437 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -175,12 +175,11 @@ class RawTxRunner implements Runnable {
             if (!wasSuccess) {
                 callbackExec.shutdownNow();
             }
-            commitTableClient.close();
             tsoClient.close().get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             // ignore
-        } catch (ExecutionException | IOException e) {
+        } catch (ExecutionException e) {
             // ignore
         } finally {
             LOG.info("TxRunner {} finished", txRunnerId);
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index 8f578c6..9b20305 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -31,7 +31,7 @@ public interface CommitTable {
 
     Client getClient() throws IOException;
 
-    interface Writer extends Closeable {
+    interface Writer{
 
         void addCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
 
@@ -53,7 +53,7 @@ public interface CommitTable {
         boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
     }
 
-    interface Client extends Closeable {
+    interface Client {
 
         /**
          * Checks whether a transaction commit data is inside the commit table The function also checks whether the
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 14a4f76..9462c3a 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -72,10 +72,6 @@ public class InMemoryCommitTable implements CommitTable {
             // required to make sure the entry was not invalidated.
             return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
         }
-
-        @Override
-        public void close() {
-        }
     }
 
     public class Client implements CommitTable.Client {
@@ -138,10 +134,6 @@ public class InMemoryCommitTable implements CommitTable {
             f.set(false);
             return f;
         }
-
-        @Override
-        public void close() {
-        }
     }
 
     public int countElements() {
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index 176d4d9..70e45da 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -59,11 +59,6 @@ public class NullCommitTable implements CommitTable {
         public void flush() throws IOException {
             // noop
         }
-
-        @Override
-        public void close() {
-        }
-
     }
 
     public static class Client implements CommitTable.Client {
@@ -89,8 +84,5 @@ public class NullCommitTable implements CommitTable {
             throw new UnsupportedOperationException();
         }
 
-        @Override
-        public void close() {
-        }
     }
 }
diff --git a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
index 0d539d8..efe5c29 100644
--- a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
+++ b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
@@ -35,36 +35,34 @@ public class NullCommitTableTest {
 
         CommitTable commitTable = new NullCommitTable();
 
-        try (CommitTable.Client commitTableClient = commitTable.getClient();
-             CommitTable.Writer commitTableWriter = commitTable.getWriter()) {
+        CommitTable.Client commitTableClient = commitTable.getClient();
+        CommitTable.Writer commitTableWriter = commitTable.getWriter();
 
-            // Test client
-            try {
-                commitTableClient.readLowWatermark().get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        // Test client
+        try {
+            commitTableClient.readLowWatermark().get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            try {
-                commitTableClient.getCommitTimestamp(TEST_ST).get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        try {
+            commitTableClient.getCommitTimestamp(TEST_ST).get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            try {
-                commitTableClient.tryInvalidateTransaction(TEST_ST).get();
-            } catch (UnsupportedOperationException e) {
-                // expected
-            }
+        try {
+            commitTableClient.tryInvalidateTransaction(TEST_ST).get();
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
 
-            assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
+        assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
 
-            // Test writer
-            commitTableWriter.updateLowWatermark(TEST_LWM);
-            commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
-            commitTableWriter.clearWriteBuffer();
-            commitTableWriter.flush();
-        }
+        // Test writer
+        commitTableWriter.updateLowWatermark(TEST_LWM);
+        commitTableWriter.addCommittedTransaction(TEST_ST, TEST_CT);
+        commitTableWriter.clearWriteBuffer();
+        commitTableWriter.flush();
     }
-
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index 6fdcd44..856d247 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -57,4 +57,8 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
         scan.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
         return table.getScanner(scan);
     }
+
+    public void close() throws IOException {
+        table.close();
+    }
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index c9fa5e5..68dc8d3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
@@ -51,29 +53,34 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
     private final Timer commitTableUpdateTimer;
     private final Timer shadowCellsUpdateTimer;
     private static final int MAX_BATCH_SIZE=1000;
+    private final Connection connection;
 
-
-    public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient) {
+    public HBaseSyncPostCommitter(MetricsRegistry metrics, CommitTable.Client commitTableClient,
+                                  Connection connection) {
         this.metrics = metrics;
         this.commitTableClient = commitTableClient;
 
         this.commitTableUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "commitTableUpdate", "latency"));
         this.shadowCellsUpdateTimer = metrics.timer(name("omid", "tm", "hbase", "shadowCellsUpdate", "latency"));
+        this.connection = connection;
     }
 
-    private void flushMutations(Table table, List<Mutation> mutations) throws IOException, InterruptedException {
-        table.batch(mutations, new Object[mutations.size()]);
+    private void flushMutations(TableName tableName, List<Mutation> mutations) throws IOException, InterruptedException {
+        try (Table table = connection.getTable(tableName)){
+            table.batch(mutations, new Object[mutations.size()]);
+        }
+
     }
 
     private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
-                               Map<Table,List<Mutation>> mutations) throws IOException, InterruptedException {
+                               Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
         Put put = new Put(cell.getRow());
         put.addColumn(cell.getFamily(),
                 CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
                 cell.getTimestamp(),
                 Bytes.toBytes(tx.getCommitTimestamp()));
 
-        Table table = cell.getTable().getHTable();
+        TableName table = cell.getTable().getHTable().getName();
         List<Mutation> tableMutations = mutations.get(table);
         if (tableMutations == null) {
             ArrayList<Mutation> newList = new ArrayList<>();
@@ -97,7 +104,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
 
         shadowCellsUpdateTimer.start();
         try {
-            Map<Table,List<Mutation>> mutations = new HashMap<>();
+            Map<TableName,List<Mutation>> mutations = new HashMap<>();
             // Add shadow cells
             for (HBaseCellId cell : tx.getWriteSet()) {
                 addShadowCell(cell, tx, updateSCFuture, mutations);
@@ -107,7 +114,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
                 addShadowCell(cell, tx, updateSCFuture, mutations);
             }
 
-            for (Map.Entry<Table,List<Mutation>> entry: mutations.entrySet()) {
+            for (Map.Entry<TableName,List<Mutation>> entry: mutations.entrySet()) {
                 flushMutations(entry.getKey(), entry.getValue());
             }
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index c66b9b2..5620be3 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
@@ -46,6 +47,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);
+    private final Connection connection;
 
     private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {
 
@@ -111,9 +113,11 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
 
         public HBaseTransactionManager build() throws IOException, InterruptedException {
 
-            CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
-            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
-            PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
+            Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
+
+            CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
+            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
+            PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
             TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
 
             return new HBaseTransactionManager(hbaseOmidClientConf,
@@ -121,7 +125,8 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                                tsoClient,
                                                commitTableClient,
                                                commitTableWriter,
-                                               new HBaseTransactionFactory());
+                                               new HBaseTransactionFactory(),
+                                               connection);
         }
 
         private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
@@ -129,25 +134,25 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
         }
 
 
-        private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
+        private Optional<CommitTable.Client> buildCommitTableClient(Connection connection) throws IOException {
             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
-            CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
+            CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
             return Optional.of(commitTable.getClient());
         }
 
-        private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
+        private Optional<CommitTable.Writer> buildCommitTableWriter(Connection connection) throws IOException {
             HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
             commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
-            CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+            CommitTable commitTable = new HBaseCommitTable(connection, commitTableConf);
             return Optional.of(commitTable.getWriter());
         }
 
-        private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
+        private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient, Connection connection) {
 
             PostCommitActions postCommitter;
             PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
-                                                                             commitTableClient);
+                                                                             commitTableClient, connection);
             switch(hbaseOmidClientConf.getPostCommitMode()) {
                 case ASYNC:
                     ListeningExecutorService postCommitExecutor =
@@ -175,7 +180,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                     TSOProtocol tsoClient,
                                     CommitTable.Client commitTableClient,
                                     CommitTable.Writer commitTableWriter,
-                                    HBaseTransactionFactory hBaseTransactionFactory) {
+                                    HBaseTransactionFactory hBaseTransactionFactory, Connection connection) {
 
         super(hBaseOmidClientConfiguration.getMetrics(),
                 postCommitter,
@@ -183,11 +188,16 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                 commitTableClient,
                 commitTableWriter,
                 hBaseTransactionFactory);
+        this.connection = connection;
     }
 
     // ----------------------------------------------------------------------------------------------------------------
     // AbstractTransactionManager overwritten methods
     // ----------------------------------------------------------------------------------------------------------------
+    @Override
+    public void closeResources() throws IOException {
+        connection.close();
+    }
 
     @Override
     public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
index f48fa55..0114ca2 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java
@@ -60,4 +60,9 @@ public class HTableAccessWrapper implements TableAccessWrapper {
         return readTable.getScanner(scan);
     }
 
+    @Override
+    public void close() throws Exception {
+        writeTable.close();
+        readTable.close();
+    }
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
index 370ac01..9372868 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 
 
-public interface SnapshotFilter {
+public interface SnapshotFilter extends AutoCloseable{
     
     Result get(Get get, HBaseTransaction transaction) throws IOException;
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index d095858..569f1bd 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -63,10 +63,6 @@ public class SnapshotFilterImpl implements SnapshotFilter {
 
     private TableAccessWrapper tableAccessWrapper;
 
-    public void closeCommitTableClient() throws IOException {
-        commitTableClient.close();
-    }
-
     private CommitTable.Client commitTableClient;
 
     public TableAccessWrapper getTableAccessWrapper() {
@@ -598,6 +594,11 @@ public class SnapshotFilterImpl implements SnapshotFilter {
                 .asList();
     }
 
+    @Override
+    public void close() throws Exception {
+        tableAccessWrapper.close();
+    }
+
 
     public class TransactionalClientScanner implements ResultScanner {
 
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index 869a013..e52c405 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -163,6 +163,12 @@ public class TTable implements Closeable {
     @Override
     public void close() throws IOException {
         table.close();
+        try {
+            snapshotFilter.close();
+        } catch (Exception e) {
+            LOG.warn("Failed to close TTable resources.");
+            e.printStackTrace();
+        }
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
index 8f7f6ac..534dc94 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TableAccessWrapper.java
@@ -29,7 +29,7 @@ import java.util.List;
 
 
 //This interface is used to wrap the HTableInterface and Region object when doing client and server side filtering accordingly.
-public interface TableAccessWrapper {
+public interface TableAccessWrapper extends AutoCloseable{
 
     Result[] get(List<Get> get) throws IOException;
     Result get(Get get) throws IOException;
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
index 5979c80..e2c9933 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestAsynchronousPostCommitter.java
@@ -67,7 +67,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -185,7 +185,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
@@ -264,7 +264,7 @@ public class TestAsynchronousPostCommitter extends OmidTestBase {
         CommitTable.Client commitTableClient = getCommitTable(context).getClient();
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         ListeningExecutorService postCommitExecutor =
                 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                         new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
index 4678110..d375084 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -78,7 +78,7 @@ public class TestFilters extends OmidTestBase {
 
         TTable table = new TTable(connection, TEST_TABLE);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
                 .commitTableWriter(getCommitTable(context).getWriter())
@@ -127,7 +127,7 @@ public class TestFilters extends OmidTestBase {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         TTable table = new TTable(connection, TEST_TABLE);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
                 .commitTableWriter(getCommitTable(context).getWriter())
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
index fb5efdf..45b5ce5 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -94,7 +94,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     @Test(timeOut = 30_000)
     public void testCrashAfterCommit(ITestContext context) throws Exception {
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -134,7 +134,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         final long NON_EXISTING_CELL_TS = 1000L;
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -257,7 +257,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     public void testCellCommitTimestampIsLocatedInCommitTable(ITestContext context) throws Exception {
 
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -374,7 +374,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
 
         CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient());
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, syncPostCommitter));
 
         // The following line emulates a crash after commit that is observed in (*) below
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index df274ae..8cf51a2 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -141,7 +141,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -189,7 +189,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableWriter(getCommitTable(context).getWriter())
@@ -250,7 +250,7 @@ public class TestShadowCells extends OmidTestBase {
         hbaseOmidClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         PostCommitActions syncPostCommitter = spy(
-                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
+                new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient, connection));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -335,7 +335,7 @@ public class TestShadowCells extends OmidTestBase {
 
         final AtomicBoolean readFailed = new AtomicBoolean(false);
         PostCommitActions syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient()));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(), getCommitTable(context).getClient(), connection));
         AbstractTransactionManager tm = (AbstractTransactionManager) newTransactionManager(context, syncPostCommitter);
 
         doAnswer(new Answer<ListenableFuture<Void>>() {
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index b83f1a3..2deb8ee 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -95,13 +95,13 @@ public class HBaseCommitTable implements CommitTable {
     private class HBaseWriter implements Writer {
 
         private static final long INITIAL_LWM_VALUE = -1L;
-        final Table table;
+
         // Our own buffer for operations
         final List<Put> writeBuffer = new LinkedList<>();
         volatile long lowWatermarkToStore = INITIAL_LWM_VALUE;
 
-        HBaseWriter() throws IOException {
-            table = hbaseConnection.getTable(TableName.valueOf(tableName));
+        HBaseWriter() {
+
         }
 
         @Override
@@ -120,7 +120,8 @@ public class HBaseCommitTable implements CommitTable {
 
         @Override
         public void flush() throws IOException {
-            try {
+
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 addLowWatermarkToStoreToWriteBuffer();
                 table.put(writeBuffer);
                 writeBuffer.clear();
@@ -137,18 +138,15 @@ public class HBaseCommitTable implements CommitTable {
 
         @Override
         public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
-            assert (startTimestamp < commitTimestamp);
-            byte[] transactionRow = startTimestampToKey(startTimestamp);
-            Put put = new Put(transactionRow, startTimestamp);
-            byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
-            put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
-            return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
-        }
+            try (Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
+                assert (startTimestamp < commitTimestamp);
+                byte[] transactionRow = startTimestampToKey(startTimestamp);
+                Put put = new Put(transactionRow, startTimestamp);
+                byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
+                put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+                return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
+            }
 
-        @Override
-        public void close() throws IOException {
-            clearWriteBuffer();
-            table.close();
         }
 
         private void addLowWatermarkToStoreToWriteBuffer() {
@@ -164,17 +162,15 @@ public class HBaseCommitTable implements CommitTable {
 
     class HBaseClient implements Client{
 
-        final Table table;
+        HBaseClient(){
 
-        HBaseClient() throws IOException {
-            table = hbaseConnection.getTable(TableName.valueOf(tableName));
         }
 
         @Override
         public ListenableFuture<Optional<CommitTimestamp>> getCommitTimestamp(long startTimestamp) {
 
             SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 Get get = new Get(startTimestampToKey(startTimestamp));
                 get.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER);
                 get.addColumn(commitTableFamily, INVALID_TX_QUALIFIER);
@@ -206,7 +202,7 @@ public class HBaseCommitTable implements CommitTable {
         @Override
         public ListenableFuture<Long> readLowWatermark() {
             SettableFuture<Long> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 Get get = new Get(LOW_WATERMARK_ROW);
                 get.addColumn(lowWatermarkFamily, LOW_WATERMARK_QUALIFIER);
                 Result result = table.get(get);
@@ -238,7 +234,7 @@ public class HBaseCommitTable implements CommitTable {
 
             Delete delete = new Delete(key, startTimestamp);
 
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 table.delete(delete);
             } catch (IOException e) {
                 SettableFuture<Void> f = SettableFuture.create();
@@ -253,7 +249,7 @@ public class HBaseCommitTable implements CommitTable {
         @Override
         public ListenableFuture<Boolean> tryInvalidateTransaction(long startTimestamp) {
             SettableFuture<Boolean> f = SettableFuture.create();
-            try {
+            try(Table table = hbaseConnection.getTable(TableName.valueOf(tableName))) {
                 byte[] row = startTimestampToKey(startTimestamp);
                 Put invalidationPut = new Put(row, startTimestamp);
                 invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
@@ -272,11 +268,6 @@ public class HBaseCommitTable implements CommitTable {
             return f;
         }
 
-        @Override
-        public synchronized void close() throws IOException {
-            table.close();
-        }
-
         private boolean containsATimestamp(Result result) {
             return (result != null && result.containsColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER));
         }
diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
index 4786eda..1006bc6 100644
--- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
+++ b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java
@@ -63,4 +63,8 @@ public class RegionAccessWrapper implements TableAccessWrapper {
         return null;
     }
 
+    @Override
+    public void close() throws Exception {
+
+    }
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index cf93163..4ba385d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -54,7 +54,7 @@ public class CompactorScanner implements InternalScanner {
     private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
     private final InternalScanner internalScanner;
     private final CommitTable.Client commitTableClient;
-    private final Queue<CommitTable.Client> commitTableClientQueue;
+
     private final boolean isMajorCompaction;
     private final boolean retainNonTransactionallyDeletedCells;
     private final long lowWatermark;
@@ -67,12 +67,10 @@ public class CompactorScanner implements InternalScanner {
     public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
                             InternalScanner internalScanner,
                             Client commitTableClient,
-                            Queue<CommitTable.Client> commitTableClientQueue,
                             boolean isMajorCompaction,
                             boolean preserveNonTransactionallyDeletedCells) throws IOException {
         this.internalScanner = internalScanner;
         this.commitTableClient = commitTableClient;
-        this.commitTableClientQueue = commitTableClientQueue;
         this.isMajorCompaction = isMajorCompaction;
         this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
         this.lowWatermark = getLowWatermarkFromCommitTable();
@@ -84,6 +82,7 @@ public class CompactorScanner implements InternalScanner {
 
     @Override
     public boolean next(List<Cell> results) throws IOException {
+        //TODO YONIGO - why-1 we get exceptions
         return next(results, -1);
     }
 
@@ -182,7 +181,6 @@ public class CompactorScanner implements InternalScanner {
     @Override
     public void close() throws IOException {
         internalScanner.close();
-        commitTableClientQueue.add(commitTableClient);
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
index f95191c..41bf13c 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
@@ -85,16 +85,16 @@ public class CompactorUtil {
         HBaseLogin.loginIfNeeded(cmdline.loginFlags);
 
         Configuration conf = HBaseConfiguration.create();
-        Connection conn = ConnectionFactory.createConnection(conf);
-
-        if (cmdline.enable) {
-            enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
-                    Bytes.toBytes(cmdline.columnFamily));
-        } else if (cmdline.disable) {
-            disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
-                    Bytes.toBytes(cmdline.columnFamily));
-        } else {
-            System.err.println("Must specify enable or disable");
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+            if (cmdline.enable) {
+                enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+                        Bytes.toBytes(cmdline.columnFamily));
+            } else if (cmdline.disable) {
+                disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
+                        Bytes.toBytes(cmdline.columnFamily));
+            } else {
+                System.err.println("Must specify enable or disable");
+            }
         }
     }
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index f8ed6b7..57f82b0 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -20,6 +20,7 @@ package org.apache.omid.transaction;
 import com.google.common.annotations.VisibleForTesting;
 
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -64,8 +65,9 @@ public class OmidCompactor extends BaseRegionObserver {
 
     private HBaseCommitTableConfig commitTableConf = null;
     private RegionCoprocessorEnvironment env = null;
+
     @VisibleForTesting
-    Queue<CommitTable.Client> commitTableClientQueue = new ConcurrentLinkedQueue<>();
+    CommitTable.Client commitTableClient;
 
     // When compacting, if a cell which has been marked by HBase as Delete or
     // Delete Family (that is, non-transactionally deleted), we allow the user
@@ -73,7 +75,8 @@ public class OmidCompactor extends BaseRegionObserver {
     // If retained, the deleted cell will appear after a minor compaction, but
     // will be deleted anyways after a major one
     private boolean retainNonTransactionallyDeletedCells;
-    private CommitTable commitTable;
+
+    private Connection connection;
 
     public OmidCompactor() {
         this(false);
@@ -92,10 +95,10 @@ public class OmidCompactor extends BaseRegionObserver {
         if (commitTableName != null) {
             commitTableConf.setTableName(commitTableName);
         }
-        commitTable = new HBaseCommitTable(RegionConnectionFactory
-                .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION,
-                        (RegionCoprocessorEnvironment) env)
-                , commitTableConf);
+
+        connection = RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, (RegionCoprocessorEnvironment) env);
+        commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
         retainNonTransactionallyDeletedCells =
                 env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
                         HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
@@ -105,11 +108,6 @@ public class OmidCompactor extends BaseRegionObserver {
     @Override
     public void stop(CoprocessorEnvironment e) throws IOException {
         LOG.info("Stopping compactor coprocessor");
-        if (commitTableClientQueue != null) {
-            for (CommitTable.Client commitTableClient : commitTableClientQueue) {
-                commitTableClient.close();
-            }
-        }
         LOG.info("Compactor coprocessor stopped");
     }
 
@@ -135,15 +133,10 @@ public class OmidCompactor extends BaseRegionObserver {
             if (!omidCompactable) {
                 return scanner;
             } else {
-                CommitTable.Client commitTableClient = commitTableClientQueue.poll();
-                if (commitTableClient == null) {
-                    commitTableClient = initAndGetCommitTableClient();
-                }
                 boolean isMajorCompaction = request.isMajor();
                 return new CompactorScanner(env,
                         scanner,
                         commitTableClient,
-                        commitTableClientQueue,
                         isMajorCompaction,
                         retainNonTransactionallyDeletedCells);
             }
@@ -153,12 +146,4 @@ public class OmidCompactor extends BaseRegionObserver {
             throw new DoNotRetryIOException(e);
         }
     }
-
-    private CommitTable.Client initAndGetCommitTableClient() throws IOException {
-        LOG.info("Trying to get the commit table client");
-        CommitTable.Client commitTableClient = commitTable.getClient();
-        LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
-        return commitTableClient;
-    }
-
 }
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index eb5d50f..7ee742d 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -19,6 +19,7 @@ package org.apache.omid.transaction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Scan;
 
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -66,7 +67,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
     private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
     private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
     private CommitTable.Client inMemoryCommitTable = null;
-    private CommitTable commitTable;
+    private CommitTable.Client commitTableClient;
+    private Connection connection;
 
     public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
         LOG.info("Compactor coprocessor initialized");
@@ -86,14 +88,15 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         if (commitTableName != null) {
             commitTableConf.setTableName(commitTableName);
         }
+        connection = RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env);
+        commitTableClient = new HBaseCommitTable(connection, commitTableConf).getClient();
         LOG.info("Snapshot filter started");
-        commitTable = new HBaseCommitTable(RegionConnectionFactory
-                .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env),
-                commitTableConf);
     }
 
     @Override
     public void stop(CoprocessorEnvironment e) throws IOException {
+        LOG.info("stopping Snapshot filter");
         LOG.info("Snapshot filter stopped");
     }
 
@@ -181,7 +184,6 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         if (inMemoryCommitTable != null) {
             return inMemoryCommitTable;
         }
-        CommitTable.Client commitTableClient = commitTable.getClient();
         return commitTableClient;
     }
 
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index a4bf65e..197ef3f 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -200,7 +200,7 @@ public class TestCompaction {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
@@ -241,7 +241,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(fakeAssignedLowWatermark);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE));
 
@@ -292,7 +292,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(Long.MAX_VALUE);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
 
         LOG.info("Flushing table {}", TEST_TABLE);
         admin.flush(TableName.valueOf(TEST_TABLE));
@@ -361,7 +361,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(neverendingTxBelowLowWatermark.getStartTimestamp());
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE));
 
@@ -422,8 +422,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.setException(new IOException("Unable to read"));
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
-
+        omidCompactor.commitTableClient = commitTableClient;
         LOG.info("Compacting table {}", TEST_TABLE);
         admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
 
@@ -1167,7 +1166,7 @@ public class TestCompaction {
         SettableFuture<Long> f = SettableFuture.create();
         f.set(lwm);
         doReturn(f).when(commitTableClient).readLowWatermark();
-        omidCompactor.commitTableClientQueue.add(commitTableClient);
+        omidCompactor.commitTableClient = commitTableClient;
     }
 
     private void compactEverything(String tableName) throws Exception {
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
index 8a217b3..eca9714 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactorScanner.java
@@ -64,8 +64,7 @@ public class TestCompactorScanner {
         ObserverContext<RegionCoprocessorEnvironment> ctx = mock(ObserverContext.class);
         InternalScanner internalScanner = mock(InternalScanner.class);
         CommitTable.Client ctClient = mock(CommitTable.Client.class);
-        @SuppressWarnings("unchecked")
-        Queue<Client> queue = mock(Queue.class);
+
         RegionCoprocessorEnvironment rce = mock(RegionCoprocessorEnvironment.class);
         HRegion hRegion = mock(HRegion.class);
         HRegionInfo hRegionInfo = mock(HRegionInfo.class);
@@ -82,7 +81,6 @@ public class TestCompactorScanner {
         try (CompactorScanner scanner = spy(new CompactorScanner(ctx,
                 internalScanner,
                 ctClient,
-                queue,
                 false,
                 retainOption))) {
 
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 4c5cc50..2cfc77e 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -207,7 +207,7 @@ public class TestSnapshotFilter {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
index 1bb5691..3496bde 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
@@ -185,7 +185,7 @@ public class TestSnapshotFilterLL {
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
-                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
         return HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
diff --git a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
index a33c9dd..6ff23aa 100644
--- a/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
+++ b/timestamp-storage/src/main/java/org/apache/omid/timestamp/storage/HBaseTimestampStorage.java
@@ -51,11 +51,12 @@ public class HBaseTimestampStorage implements TimestampStorage {
 
     private final Table table;
     private final byte[] cfName;
+    private final Connection connection;
 
     @Inject
     public HBaseTimestampStorage(Configuration hbaseConfig, HBaseTimestampStorageConfig config) throws IOException {
-        Connection conn = ConnectionFactory.createConnection(hbaseConfig);
-        this.table = conn.getTable(TableName.valueOf(config.getTableName()));
+        connection = ConnectionFactory.createConnection(hbaseConfig);
+        this.table = connection.getTable(TableName.valueOf(config.getTableName()));
         this.cfName = config.getFamilyName().getBytes(UTF_8);
     }
 
@@ -91,4 +92,10 @@ public class HBaseTimestampStorage implements TimestampStorage {
 
     }
 
+
+    public void close() throws IOException {
+        //TODO this is never called
+        table.close();
+        connection.close();
+    }
 }
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 5075a7f..0eca91a 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -314,15 +314,16 @@ public abstract class AbstractTransactionManager implements TransactionManager {
      */
     public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
 
+
+    abstract void closeResources() throws IOException;
+
     /**
      * @see java.io.Closeable#close()
      */
     @Override
     public final void close() throws IOException {
-
         tsoClient.close();
-        commitTableClient.close();
-
+        closeResources();
     }
 
     // ----------------------------------------------------------------------------------------------------------------