You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/01/02 06:44:31 UTC

incubator-omid git commit: [OMID-124] Change hbase commitTable.Client to not do async deletes to ct

Repository: incubator-omid
Updated Branches:
  refs/heads/1.0.1 48f52f1db -> c010f7509


[OMID-124] Change hbase commitTable.Client to not do async deletes to ct


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/c010f750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/c010f750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/c010f750

Branch: refs/heads/1.0.1
Commit: c010f7509ec0e44ddefd5bf132a502c716973dea
Parents: 48f52f1
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Wed Jan 2 08:44:10 2019 +0200
Committer: Yonatan Gottesman <yo...@gmail.com>
Committed: Wed Jan 2 08:44:10 2019 +0200

----------------------------------------------------------------------
 .../apache/omid/benchmarks/tso/RawTxRunner.java |   2 +-
 .../apache/omid/committable/CommitTable.java    |   2 +-
 .../omid/committable/InMemoryCommitTable.java   |   2 +-
 .../omid/committable/NullCommitTable.java       |   2 +-
 .../omid/committable/NullCommitTableTest.java   |   2 +-
 .../transaction/HBaseSyncPostCommitter.java     |   2 +-
 .../omid/transaction/SnapshotFilterImpl.java    |   2 +-
 .../apache/omid/transaction/OmidTestBase.java   |   1 +
 .../omid/transaction/TestShadowCells.java       |   4 +-
 .../committable/hbase/HBaseCommitTable.java     | 138 +++----------------
 .../committable/hbase/TestHBaseCommitTable.java |  36 +----
 .../apache/omid/transaction/OmidCompactor.java  |   7 +-
 .../omid/transaction/OmidSnapshotFilter.java    |  47 +------
 .../transaction/AbstractTransactionManager.java |   2 +-
 ...stTSOClientRequestAndResponseBehaviours.java |   2 +-
 15 files changed, 45 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
----------------------------------------------------------------------
diff --git a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
index f18fb56..511ba05 100644
--- a/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
+++ b/benchmarks/src/main/java/org/apache/omid/benchmarks/tso/RawTxRunner.java
@@ -281,7 +281,7 @@ class RawTxRunner implements Runnable {
 
             try {
                 commitFuture.get();
-                commitTableClient.completeTransaction(txId).get();
+                commitTableClient.deleteCommitEntry(txId).get();
                 commitTimer.update(System.nanoTime() - commitRequestTime);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index f3c15f5..8f578c6 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -66,7 +66,7 @@ public interface CommitTable {
 
         ListenableFuture<Long> readLowWatermark();
 
-        ListenableFuture<Void> completeTransaction(long startTimestamp);
+        ListenableFuture<Void> deleteCommitEntry(long startTimestamp);
 
         /**
          * Atomically tries to invalidate a non-committed transaction launched by a previous TSO server.

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 6f9f384..14a4f76 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -103,7 +103,7 @@ public class InMemoryCommitTable implements CommitTable {
         }
 
         @Override
-        public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+        public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
             SettableFuture<Void> f = SettableFuture.create();
             table.remove(startTimestamp);
             f.set(null);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index c27a238..176d4d9 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -78,7 +78,7 @@ public class NullCommitTable implements CommitTable {
         }
 
         @Override
-        public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+        public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
             SettableFuture<Void> f = SettableFuture.create();
             f.set(null);
             return f;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
----------------------------------------------------------------------
diff --git a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
index cb0fbf9..0d539d8 100644
--- a/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
+++ b/commit-table/src/test/java/org/apache/omid/committable/NullCommitTableTest.java
@@ -57,7 +57,7 @@ public class NullCommitTableTest {
                 // expected
             }
 
-            assertNull(commitTableClient.completeTransaction(TEST_ST).get());
+            assertNull(commitTableClient.deleteCommitEntry(TEST_ST).get());
 
             // Test writer
             commitTableWriter.updateLowWatermark(TEST_LWM);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
index d5f9c4d..a7ad55f 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseSyncPostCommitter.java
@@ -113,7 +113,7 @@ public class HBaseSyncPostCommitter implements PostCommitActions {
         commitTableUpdateTimer.start();
 
         try {
-            commitTableClient.completeTransaction(tx.getStartTimestamp()).get();
+            commitTableClient.deleteCommitEntry(tx.getStartTimestamp()).get();
             updateSCFuture.set(null);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index 5c88e92..d095858 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -212,7 +212,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
                         commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
                         if (commitTimeStamp.isPresent()) {
                             // Remove false invalidation from commit table
-                            commitTableClient.completeTransaction(cellStartTimestamp);
+                            commitTableClient.deleteCommitEntry(cellStartTimestamp);
                             return commitTimeStamp.get();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index cb09e3c..0e7969d 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -89,6 +89,7 @@ public abstract class OmidTestBase {
         TSOServerConfig tsoConfig = new TSOServerConfig();
         tsoConfig.setPort(1234);
         tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setWaitStrategy("LOW_CPU");
         Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
         LOG.info("Starting TSO");
         TSOServer tso = injector.getInstance(TSOServer.class);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index b5e186f..23ea809 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -293,8 +293,8 @@ public class TestShadowCells extends OmidTestBase {
                 "Cell should be there");
         assertFalse(hasShadowCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
                 "Shadow cell should not be there");
-        // 2) and thus, completeTransaction() was never called on the commit table...
-        verify(commitTableClient, times(0)).completeTransaction(anyLong());
+        // 2) and thus, deleteCommitEntry() was never called on the commit table...
+        verify(commitTableClient, times(0)).deleteCommitEntry(anyLong());
         // 3) ...and commit value still in commit table
         assertTrue(commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get().isPresent());
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index 6320e4d..b83f1a3 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -21,16 +21,10 @@ import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TA
 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.INVALID_TX_QUALIFIER;
 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_QUALIFIER;
 import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.LOW_WATERMARK_ROW;
-
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+
 
 import javax.inject.Inject;
 
@@ -53,7 +47,6 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 
@@ -169,27 +162,12 @@ public class HBaseCommitTable implements CommitTable {
 
     }
 
-    class HBaseClient implements Client, Runnable {
+    class HBaseClient implements Client{
 
         final Table table;
-        final Table deleteTable;
-        final ExecutorService deleteBatchExecutor;
-        final BlockingQueue<DeleteRequest> deleteQueue;
-        boolean isClosed = false; // @GuardedBy("this")
-        final static int DELETE_BATCH_SIZE = 1024;
 
         HBaseClient() throws IOException {
-            // TODO: create TTable here instead
             table = hbaseConnection.getTable(TableName.valueOf(tableName));
-            // FIXME: why is this using autoFlush of false? Why would every Delete
-            // need to be send through a separate RPC?
-            deleteTable = hbaseConnection.getTable(TableName.valueOf(tableName));
-            deleteQueue = new ArrayBlockingQueue<>(DELETE_BATCH_SIZE);
-
-            deleteBatchExecutor = Executors.newSingleThreadExecutor(
-                    new ThreadFactoryBuilder().setNameFormat("omid-completor-%d").build());
-            deleteBatchExecutor.submit(this);
-
         }
 
         @Override
@@ -245,33 +223,31 @@ public class HBaseCommitTable implements CommitTable {
             return f;
         }
 
+        // This function is only used to delete a CT entry and should be renamed
         @Override
-        public ListenableFuture<Void> completeTransaction(long startTimestamp) {
+        public ListenableFuture<Void> deleteCommitEntry(long startTimestamp) {
+            byte[] key;
             try {
-                synchronized (this) {
-
-                    if (isClosed) {
-                        SettableFuture<Void> f = SettableFuture.create();
-                        f.setException(new IOException("Not accepting requests anymore"));
-                        return f;
-                    }
-
-                    DeleteRequest req = new DeleteRequest(
-                            new Delete(startTimestampToKey(startTimestamp), startTimestamp));
-                    deleteQueue.put(req);
-                    return req;
-                }
-            } catch (IOException ioe) {
-                LOG.warn("Error generating timestamp for transaction completion", ioe);
+                key = startTimestampToKey(startTimestamp);
+            } catch (IOException e) {
+                LOG.warn("Error generating timestamp for transaction completion", e);
                 SettableFuture<Void> f = SettableFuture.create();
-                f.setException(ioe);
+                f.setException(e);
                 return f;
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
+            }
+
+            Delete delete = new Delete(key, startTimestamp);
+
+            try {
+                table.delete(delete);
+            } catch (IOException e) {
                 SettableFuture<Void> f = SettableFuture.create();
-                f.setException(ie);
-                return f;
+                LOG.warn("Error contacting hbase", e);
+                f.setException(e);
             }
+            SettableFuture<Void> f = SettableFuture.create();
+            f.set(null);
+            return f;
         }
 
         @Override
@@ -297,79 +273,7 @@ public class HBaseCommitTable implements CommitTable {
         }
 
         @Override
-        @SuppressWarnings("InfiniteLoopStatement")
-        public void run() {
-            List<DeleteRequest> reqbatch = new ArrayList<>();
-            try {
-                while (true) {
-                    DeleteRequest r = deleteQueue.poll();
-                    if (r == null && reqbatch.size() == 0) {
-                        r = deleteQueue.take();
-                    }
-
-                    if (r != null) {
-                        reqbatch.add(r);
-                    }
-
-                    if (r == null || reqbatch.size() == DELETE_BATCH_SIZE) {
-                        List<Delete> deletes = new ArrayList<>();
-                        for (DeleteRequest dr : reqbatch) {
-                            deletes.add(dr.getDelete());
-                        }
-                        try {
-                            deleteTable.delete(deletes);
-                            for (DeleteRequest dr : reqbatch) {
-                                dr.complete();
-                            }
-                        } catch (IOException ioe) {
-                            LOG.warn("Error contacting hbase", ioe);
-                            for (DeleteRequest dr : reqbatch) {
-                                dr.error(ioe);
-                            }
-                        } finally {
-                            reqbatch.clear();
-                        }
-                    }
-                }
-            } catch (InterruptedException ie) {
-                // Drain the queue and place the exception in the future
-                // for those who placed requests
-                LOG.warn("Draining delete queue");
-                DeleteRequest queuedRequest = deleteQueue.poll();
-                while (queuedRequest != null) {
-                    reqbatch.add(queuedRequest);
-                    queuedRequest = deleteQueue.poll();
-                }
-                for (DeleteRequest dr : reqbatch) {
-                    dr.error(new IOException("HBase CommitTable is going to be closed"));
-                }
-                reqbatch.clear();
-                Thread.currentThread().interrupt();
-            } catch (Throwable t) {
-                LOG.error("Transaction completion thread threw exception", t);
-            }
-        }
-
-        @Override
         public synchronized void close() throws IOException {
-            isClosed = true;
-            deleteBatchExecutor.shutdownNow(); // may need to interrupt take
-            try {
-                if (!deleteBatchExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
-                    LOG.warn("Delete executor did not shutdown");
-                }
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-
-            LOG.warn("Re-Draining delete queue just in case");
-            DeleteRequest queuedRequest = deleteQueue.poll();
-            while (queuedRequest != null) {
-                queuedRequest.error(new IOException("HBase CommitTable is going to be closed"));
-                queuedRequest = deleteQueue.poll();
-            }
-
-            deleteTable.close();
             table.close();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
index b67e2a8..a29def2 100644
--- a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
+++ b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
@@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,10 +40,8 @@ import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.Client;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.committable.CommitTable.Writer;
-import org.apache.omid.committable.hbase.HBaseCommitTable.HBaseClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -165,7 +162,7 @@ public class TestHBaseCommitTable {
         // Test the successful deletion of the 1000 txs
         Future<Void> f;
         for (long i = 0; i < 1000; i++) {
-            f = client.completeTransaction(i);
+            f = client.deleteCommitEntry(i);
             f.get();
         }
         assertEquals(rowCount(TABLE_NAME, commitTableFamily), 0, "Rows should be 0!");
@@ -262,37 +259,6 @@ public class TestHBaseCommitTable {
 
     }
 
-    @Test(timeOut = 30_000)
-    public void testClosingClientEmptyQueuesProperly() throws Throwable {
-        HBaseCommitTableConfig config = new HBaseCommitTableConfig();
-        config.setTableName(TEST_TABLE);
-        HBaseCommitTable commitTable = new HBaseCommitTable(connection, config);
-
-        Writer writer = commitTable.getWriter();
-        HBaseCommitTable.HBaseClient client = (HBaseClient) commitTable.getClient();
-
-        for (int i = 0; i < 1000; i++) {
-            writer.addCommittedTransaction(i, i + 1);
-        }
-        writer.flush();
-
-        // Completing first transaction should be fine
-        client.completeTransaction(0).get();
-        assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
-
-        // When closing, removing a transaction should throw an EE with an IOException
-        client.close();
-        try {
-            client.completeTransaction(1).get();
-            Assert.fail();
-        } catch (ExecutionException e) {
-            // Expected
-        }
-        assertEquals(client.deleteQueue.size(), 0, "Delete queue size should be 0!");
-        assertEquals(rowCount(TABLE_NAME, commitTableFamily), 999, "Rows should be 999!");
-
-    }
-
     private static long rowCount(TableName tableName, byte[] family) throws Throwable {
         Scan scan = new Scan();
         scan.addFamily(family);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index 0f39737..f8ed6b7 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -73,6 +73,7 @@ public class OmidCompactor extends BaseRegionObserver {
     // If retained, the deleted cell will appear after a minor compaction, but
     // will be deleted anyways after a major one
     private boolean retainNonTransactionallyDeletedCells;
+    private CommitTable commitTable;
 
     public OmidCompactor() {
         this(false);
@@ -86,12 +87,15 @@ public class OmidCompactor extends BaseRegionObserver {
     @Override
     public void start(CoprocessorEnvironment env) throws IOException {
         LOG.info("Starting compactor coprocessor");
-        this.env = (RegionCoprocessorEnvironment) env;
         commitTableConf = new HBaseCommitTableConfig();
         String commitTableName = env.getConfiguration().get(COMMIT_TABLE_NAME_KEY);
         if (commitTableName != null) {
             commitTableConf.setTableName(commitTableName);
         }
+        commitTable = new HBaseCommitTable(RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION,
+                        (RegionCoprocessorEnvironment) env)
+                , commitTableConf);
         retainNonTransactionallyDeletedCells =
                 env.getConfiguration().getBoolean(HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_KEY,
                         HBASE_RETAIN_NON_TRANSACTIONALLY_DELETED_CELLS_DEFAULT);
@@ -152,7 +156,6 @@ public class OmidCompactor extends BaseRegionObserver {
 
     private CommitTable.Client initAndGetCommitTableClient() throws IOException {
         LOG.info("Trying to get the commit table client");
-        CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.COMPACTION_CONNECTION, env), commitTableConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
         return commitTableClient;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index 115a467..eb5d50f 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hbase.client.Scan;
 
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.filter.Filter;
-
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 
 
@@ -68,9 +66,10 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
     private Queue<SnapshotFilterImpl> snapshotFilterQueue = new ConcurrentLinkedQueue<>();
     private Map<Object, SnapshotFilterImpl> snapshotFilterMap = new ConcurrentHashMap<>();
     private CommitTable.Client inMemoryCommitTable = null;
+    private CommitTable commitTable;
 
     public OmidSnapshotFilter(CommitTable.Client commitTableClient) {
-        LOG.info("Compactor coprocessor initialized with constructor for testing");
+        LOG.info("Compactor coprocessor initialized");
         this.inMemoryCommitTable = commitTableClient;
     }
 
@@ -79,7 +78,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
     }
 
     @Override
-    public void start(CoprocessorEnvironment env) {
+    public void start(CoprocessorEnvironment env) throws IOException {
         LOG.info("Starting snapshot filter coprocessor");
         this.env = (RegionCoprocessorEnvironment)env;
         commitTableConf = new HBaseCommitTableConfig();
@@ -88,16 +87,13 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
             commitTableConf.setTableName(commitTableName);
         }
         LOG.info("Snapshot filter started");
+        commitTable = new HBaseCommitTable(RegionConnectionFactory
+                .getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, (RegionCoprocessorEnvironment) env),
+                commitTableConf);
     }
 
     @Override
     public void stop(CoprocessorEnvironment e) throws IOException {
-        LOG.info("Stopping snapshot filter coprocessor");
-        if (snapshotFilterQueue != null) {
-            for (SnapshotFilterImpl snapshotFilter: snapshotFilterQueue) {
-                snapshotFilter.closeCommitTableClient();
-            }
-        }
         LOG.info("Snapshot filter stopped");
     }
 
@@ -165,37 +161,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         Filter newFilter = TransactionFilters.getVisibilityFilter(scan.getFilter(),
                 snapshotFilter, hbaseTransaction);
         scan.setFilter(newFilter);
-        snapshotFilterMap.put(scan, snapshotFilter);
         return;
     }
 
-    // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
-    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
-                                         Scan scan,
-                                         RegionScanner s) {
-        byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
-
-        if (byteTransaction == null) {
-            return s;
-        }
-
-        SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(scan);
-        assert(snapshotFilter != null);
-        snapshotFilterMap.remove(scan);
-        snapshotFilterMap.put(s, snapshotFilter);
-        return s;
-    }
-
-    // Don't add an @Override tag since this method doesn't exist in both hbase-1 and hbase-2
-    public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s) {
-        SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
-        if (snapshotFilter != null) {
-            snapshotFilterQueue.add(snapshotFilter);
-        }
-    }
-
-
-
     private HBaseTransaction getHBaseTransaction(byte[] byteTransaction, boolean isLowLatency)
             throws InvalidProtocolBufferException {
         TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
@@ -210,13 +178,10 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
     }
 
     private CommitTable.Client initAndGetCommitTableClient() throws IOException {
-        LOG.info("Trying to get the commit table client");
         if (inMemoryCommitTable != null) {
             return inMemoryCommitTable;
         }
-        CommitTable commitTable = new HBaseCommitTable(RegionConnectionFactory.getConnection(RegionConnectionFactory.ConnectionType.READ_CONNECTION, env), commitTableConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
-        LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName());
         return commitTableClient;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 99abdb6..5075a7f 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -365,7 +365,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
             if (!committed) {
                 // Transaction has been invalidated by other client
                 rollback(tx);
-                commitTableClient.completeTransaction(tx.getStartTimestamp());
+                commitTableClient.deleteCommitEntry(tx.getStartTimestamp());
                 rolledbackTxsCounter.inc();
                 throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
             }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c010f750/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index 080c23e..48d3a40 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -390,7 +390,7 @@ public class TestTSOClientRequestAndResponseBehaviours {
         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
 
         // Simulate remove entry from the commit table before exercise retry
-        commitTable.getClient().completeTransaction(tx1ST);
+        commitTable.getClient().deleteCommitEntry(tx1ST);
 
         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
         assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");