You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2018/10/18 18:29:19 UTC

[1/4] incubator-omid git commit: OMID-90 Add omid low latency mode

Repository: incubator-omid
Updated Branches:
  refs/heads/phoenix-integration 35053f720 -> ccb53892a


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
new file mode 100644
index 0000000..17c70f0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOLL.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.tso.client.CellId;
+import org.apache.omid.tso.client.TSOClient;
+import org.apache.omid.tso.client.TSOClientOneShot;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.testng.annotations.Test;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.Set;
+import static org.testng.Assert.assertTrue;
+
+public class TestTSOLL {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestTSOLL.class);
+
+    private static final String TSO_SERVER_HOST = "localhost";
+    private static final int TSO_SERVER_PORT = 1234;
+
+
+    private OmidClientConfiguration tsoClientConf;
+
+    // Required infrastructure for TSOClient test
+    private TSOServer tsoServer;
+    private PausableTimestampOracle pausableTSOracle;
+    private CommitTable commitTable;
+
+    private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+    private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);
+
+    private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);
+
+    @Mock
+    ReplyProcessor replyProcessor;
+
+    @BeforeMethod
+    public void beforeMethod() throws Exception {
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setLowLatency(true);
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(TSO_SERVER_PORT);
+        tsoConfig.setNumConcurrentCTWriters(2);
+        Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+        Injector injector = Guice.createInjector(tsoServerMockModule);
+
+        LOG.info("==================================================================================================");
+        LOG.info("======================================= Init TSO Server ==========================================");
+        LOG.info("==================================================================================================");
+
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAndWait();
+        TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+        LOG.info("==================================================================================================");
+        LOG.info("===================================== TSO Server Initialized =====================================");
+        LOG.info("==================================================================================================");
+
+        pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
+        commitTable = injector.getInstance(CommitTable.class);
+
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+        this.tsoClientConf = tsoClientConf;
+        commitTable = injector.getInstance(CommitTable.class);
+        replyProcessor = injector.getInstance(ReplyProcessor.class);
+    }
+
+    @AfterMethod
+    public void afterMethod() throws Exception {
+
+
+        tsoServer.stopAndWait();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+
+        pausableTSOracle.resume();
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testNoWriteToCommitTable() throws Exception {
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
+        long ts1 = client.getNewStartTimestamp().get();
+
+        TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
+        assertTrue(response1.getCommitResponse().hasCommitTimestamp());
+        Optional<CommitTable.CommitTimestamp> cts = commitTable.getClient().getCommitTimestamp(ts1).get();
+
+        assertTrue(cts.isPresent() == false);
+    }
+
+    private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
+        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
+        TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
+        commitBuilder.setStartTimestamp(ts);
+        commitBuilder.setIsRetry(retry);
+        for (CellId cell : writeSet) {
+            commitBuilder.addCellId(cell.getCellId());
+        }
+        return builder.setCommitRequest(commitBuilder.build()).build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index c4c9c61..3ae8968 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -181,21 +181,29 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
 
     @Test(timeOut = 30_000)
     public void testCommitWritesToCommitTable() throws Exception {
+
         long startTsForTx1 = tsoClient.getNewStartTimestamp().get();
         long startTsForTx2 = tsoClient.getNewStartTimestamp().get();
         assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow");
 
-        assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
-                "Commit TS for Tx1 shouldn't appear in Commit Table");
+        if (!tsoClient.isLowLatency())
+            assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(),
+                    "Commit TS for Tx1 shouldn't appear in Commit Table");
 
         long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get();
         assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx");
 
-        Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
-        assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
-        assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
-                "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
-        assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        if (!tsoClient.isLowLatency()) {
+            Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue();
+            assertNotNull(commitTs1InCommitTable, "Tx is committed, should return as such from Commit Table");
+            assertEquals(commitTsForTx1, (long) commitTs1InCommitTable,
+                    "getCommitTimestamp() & commit() should report same Commit TS value for same tx");
+            assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        } else {
+            assertTrue(commitTsForTx1 > startTsForTx2, "Commit TS should be higher than tx's Start TS");
+        }
+
+
     }
 
     @Test(timeOut = 30_000)
@@ -265,7 +273,7 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get();
         long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get();
 
-        tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
+        Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get();
         try {
             tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get();
             Assert.fail("Second commit should fail as conflicts with the previous concurrent one");
@@ -275,7 +283,8 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get();
 
         assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit");
-        long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
+        if (!tsoClient.isLowLatency())
+            commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue();
         assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started");
         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index a2da056..080c23e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -257,9 +257,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
 
         TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
         TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
-        assertEquals(response2.getCommitResponse().getCommitTimestamp(),
-                     response1.getCommitResponse().getCommitTimestamp(),
-                     "Commit timestamp should be the same");
+        if (client.isLowLatency()) {
+            assertTrue(response1.hasCommitResponse());
+            assertTrue(response2.getCommitResponse().getAborted());
+        } else
+            assertEquals(response2.getCommitResponse().getCommitTimestamp(),
+                    response1.getCommitResponse().getCommitTimestamp(),
+                    "Commit timestamp should be the same");
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -270,8 +274,9 @@ public class TestTSOClientRequestAndResponseBehaviours {
     public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
 
         TSOClient client = TSOClient.newInstance(tsoClientConf);
-
         long ts1 = client.getNewStartTimestamp().get();
+        if(client.isLowLatency())
+            return;
         pausableTSOracle.pause();
         TSOFuture<Long> future = client.commit(ts1, testWriteSet);
         TSOClientAccessor.closeChannel(client);
@@ -349,8 +354,13 @@ public class TestTSOClientRequestAndResponseBehaviours {
 
         clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
         TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
-        assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
-        assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+        if (client.isLowLatency())
+            assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
+        else {
+            assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
+            assertEquals(response.getCommitResponse().getCommitTimestamp(),
+                    tx1ST + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
+        }
     }
 
     @Test(timeOut = 30_000)


[4/4] incubator-omid git commit: OMID-90 Add omid low latency mode

Posted by yo...@apache.org.
OMID-90 Add omid low latency mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/ccb53892
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/ccb53892
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/ccb53892

Branch: refs/heads/phoenix-integration
Commit: ccb53892a503d4e4d2169628a06afbe75ce999dc
Parents: 35053f7
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Thu Oct 18 21:26:52 2018 +0300
Committer: Yonatan Gottesman <yo...@gmail.com>
Committed: Thu Oct 18 21:29:04 2018 +0300

----------------------------------------------------------------------
 .../apache/omid/committable/CommitTable.java    |   5 +
 .../omid/committable/InMemoryCommitTable.java   |   8 +
 .../omid/committable/NullCommitTable.java       |   5 +
 common/src/main/proto/TSOProto.proto            |   1 +
 .../transaction/AttributeSetSnapshotFilter.java |  29 +-
 .../omid/transaction/HBaseTransaction.java      |  17 +-
 .../transaction/HBaseTransactionManager.java    |  29 +-
 .../apache/omid/transaction/SnapshotFilter.java |  22 +-
 .../omid/transaction/SnapshotFilterImpl.java    |  62 ++-
 .../org/apache/omid/transaction/TTable.java     |  31 +-
 .../apache/omid/transaction/OmidTestBase.java   |   6 +-
 .../TestBaillisAnomaliesWithTXs.java            |  11 +-
 .../omid/transaction/TestBasicTransaction.java  |  14 +-
 .../apache/omid/transaction/TestDeletion.java   |  28 +-
 .../apache/omid/transaction/TestFilters.java    |   2 +
 .../transaction/TestHBaseTransactionClient.java |  95 ++--
 .../omid/transaction/TestOmidLLRaces.java       | 250 +++++++++++
 .../omid/transaction/TestShadowCells.java       |   5 +-
 .../apache/omid/transaction/TestTSOModule.java  |   3 +
 .../committable/hbase/HBaseCommitTable.java     |  12 +-
 .../org/apache/omid/transaction/CellUtils.java  |   1 +
 .../omid/transaction/OmidSnapshotFilter.java    |  16 +-
 .../TSOForHBaseCompactorTestModule.java         |   4 +-
 .../TSOForSnapshotFilterTestModule.java         |   3 +
 .../omid/transaction/TestSnapshotFilter.java    |  20 +-
 .../omid/transaction/TestSnapshotFilterLL.java  | 284 ++++++++++++
 .../omid/transaction/AbstractTransaction.java   |  20 +-
 .../transaction/AbstractTransactionManager.java |  50 ++-
 .../apache/omid/transaction/Transaction.java    |   6 +
 .../apache/omid/tso/client/MockTSOClient.java   |   5 +
 .../org/apache/omid/tso/client/TSOClient.java   |  10 +-
 .../org/apache/omid/tso/client/TSOProtocol.java |   6 +
 .../omid/tso/AbstractRequestProcessor.java      | 445 +++++++++++++++++++
 .../org/apache/omid/tso/DisruptorModule.java    |  11 +-
 .../org/apache/omid/tso/LowWatermarkWriter.java |  24 +
 .../apache/omid/tso/LowWatermarkWriterImpl.java |  79 ++++
 .../org/apache/omid/tso/MonitoringContext.java  |  56 +--
 .../omid/tso/MonitoringContextFactory.java      |  31 ++
 .../apache/omid/tso/MonitoringContextImpl.java  |  75 ++++
 .../omid/tso/MonitoringContextNullImpl.java     |  36 ++
 .../apache/omid/tso/PersistenceProcessor.java   |   2 +-
 .../omid/tso/PersistenceProcessorImpl.java      |  32 --
 .../omid/tso/PersitenceProcessorNullImpl.java   |  60 +++
 .../org/apache/omid/tso/ReplyProcessor.java     |   8 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  36 +-
 .../apache/omid/tso/RequestProcessorImpl.java   | 435 ------------------
 .../omid/tso/RequestProcessorPersistCT.java     |  68 +++
 .../apache/omid/tso/RequestProcessorSkipCT.java |  87 ++++
 .../org/apache/omid/tso/RetryProcessorImpl.java |   6 +-
 .../org/apache/omid/tso/TSOChannelHandler.java  |  11 +-
 .../java/org/apache/omid/tso/TSOModule.java     |   2 +-
 .../java/org/apache/omid/tso/TSOServer.java     |   5 +-
 .../org/apache/omid/tso/TSOServerConfig.java    |  20 +
 .../apache/omid/tso/TimestampOracleImpl.java    |   4 +-
 .../default-omid-server-configuration.yml       |   3 +
 .../java/org/apache/omid/tso/TSOMockModule.java |   1 +
 .../java/org/apache/omid/tso/TestBatch.java     |   2 +-
 .../java/org/apache/omid/tso/TestPanicker.java  |  14 +-
 .../omid/tso/TestPersistenceProcessor.java      |  40 +-
 .../tso/TestPersistenceProcessorHandler.java    |  64 +--
 .../org/apache/omid/tso/TestReplyProcessor.java |  12 +-
 .../apache/omid/tso/TestRequestProcessor.java   |  68 +--
 .../org/apache/omid/tso/TestRetryProcessor.java |  14 +-
 .../omid/tso/TestTSOChannelHandlerNetty.java    |  12 +-
 .../java/org/apache/omid/tso/TestTSOLL.java     | 136 ++++++
 ...tionOfTSOClientServerBasicFunctionality.java |  27 +-
 ...stTSOClientRequestAndResponseBehaviours.java |  22 +-
 67 files changed, 2165 insertions(+), 843 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index 91f590e..f3c15f5 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -46,6 +46,11 @@ public interface CommitTable {
          * Allows to clean the write's current buffer. It is required for HA
          */
         void clearWriteBuffer();
+
+        /**
+         * Add commited transaction while checking if invalidated by other client
+         */
+        boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
     }
 
     interface Client extends Closeable {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 90af54a..6f9f384 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -66,6 +66,14 @@ public class InMemoryCommitTable implements CommitTable {
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            // In this implementation, we use only one location that represents
+            // both the value and the invalidation. Therefore, putIfAbsent is
+            // required to make sure the entry was not invalidated.
+            return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
+        }
+
+        @Override
         public void close() {
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
----------------------------------------------------------------------
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index 1cba77e..c27a238 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -51,6 +51,11 @@ public class NullCommitTable implements CommitTable {
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            return true;
+        }
+
+        @Override
         public void flush() throws IOException {
             // noop
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/common/src/main/proto/TSOProto.proto
----------------------------------------------------------------------
diff --git a/common/src/main/proto/TSOProto.proto b/common/src/main/proto/TSOProto.proto
index b434421..311bb99 100644
--- a/common/src/main/proto/TSOProto.proto
+++ b/common/src/main/proto/TSOProto.proto
@@ -75,6 +75,7 @@ message HandshakeRequest {
 message HandshakeResponse {
     optional bool clientCompatible = 1;
     optional Capabilities serverCapabilities = 2;
+    optional bool lowLatency = 3[default= false];
 }
 
 message Transaction {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
index 734ad5c..6fdcd44 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/AttributeSetSnapshotFilter.java
@@ -18,20 +18,14 @@
 package org.apache.omid.transaction;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.proto.TSOProto;
 
-import com.google.common.base.Optional;
 
 public class AttributeSetSnapshotFilter implements SnapshotFilter {
 
@@ -52,6 +46,7 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
     public Result get(Get get, HBaseTransaction transaction) throws IOException {
         get.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());
         get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, Bytes.toBytes(true));
+        get.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
 
         return table.get(get);
     }
@@ -59,27 +54,7 @@ public class AttributeSetSnapshotFilter implements SnapshotFilter {
     @Override
     public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException {
         scan.setAttribute(CellUtils.TRANSACTION_ATTRIBUTE, getBuilder(transaction).build().toByteArray());
-
+        scan.setAttribute(CellUtils.LL_ATTRIBUTE, Bytes.toBytes(transaction.isLowLatency()));
         return table.getScanner(scan);
     }
-
-    @Override
-    public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-                                      int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException {
-        throw new UnsupportedOperationException();
-    }
-
-    public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
-            CommitTimestampLocator locator) throws IOException {
-        throw new UnsupportedOperationException();        
-    }
-
-    public Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
-            throws IOException {
-        throw new UnsupportedOperationException();                
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
index ffd93d9..62ef936 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransaction.java
@@ -28,16 +28,21 @@ import org.slf4j.LoggerFactory;
 public class HBaseTransaction extends AbstractTransaction<HBaseCellId> {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseTransaction.class);
 
-    public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm) {
-        super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm);
+    public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
+                            Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, boolean isLowLatency) {
+        super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
     }
 
-    public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm, long readTimestamp, long writeTimestamp) {
-        super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp);
+    public HBaseTransaction(long transactionId, long epoch, Set<HBaseCellId> writeSet,
+                            Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm,
+                            long readTimestamp, long writeTimestamp, boolean isLowLatency) {
+        super(transactionId, epoch, writeSet, conflictFreeWriteSet, tm, readTimestamp, writeTimestamp, isLowLatency);
     }
 
-    public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch, Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet, AbstractTransactionManager tm) {
-        super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm);
+    public HBaseTransaction(long transactionId, long readTimestamp, VisibilityLevel visibilityLevel, long epoch,
+                            Set<HBaseCellId> writeSet, Set<HBaseCellId> conflictFreeWriteSet,
+                            AbstractTransactionManager tm, boolean isLowLatency) {
+        super(transactionId, readTimestamp, visibilityLevel, epoch, writeSet, conflictFreeWriteSet, tm, isLowLatency);
     }
 
     private void deleteCell(HBaseCellId cell) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index 85c785e..9c16aee 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -51,7 +51,8 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
         @Override
         public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {
 
-            return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), tm);
+            return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(),
+                    tm, tm.isLowLatency());
 
         }
 
@@ -80,6 +81,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
         // Optional parameters - initialized to default values
         private Optional<TSOClient> tsoClient = Optional.absent();
         private Optional<CommitTable.Client> commitTableClient = Optional.absent();
+        private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
         private Optional<PostCommitActions> postCommitter = Optional.absent();
 
         private Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
@@ -96,6 +98,11 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
             return this;
         }
 
+        public Builder commitTableWriter(CommitTable.Writer writer) {
+            this.commitTableWriter = Optional.of(writer);
+            return this;
+        }
+
         Builder postCommitter(PostCommitActions postCommitter) {
             this.postCommitter = Optional.of(postCommitter);
             return this;
@@ -104,6 +111,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
         public HBaseTransactionManager build() throws IOException, InterruptedException {
 
             CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
+            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
             PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
             TSOClient tsoClient = this.tsoClient.or(buildTSOClient()).get();
 
@@ -111,6 +119,7 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                                postCommitter,
                                                tsoClient,
                                                commitTableClient,
+                                               commitTableWriter,
                                                new HBaseTransactionFactory());
         }
 
@@ -126,6 +135,13 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
             return Optional.of(commitTable.getClient());
         }
 
+        private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
+            HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
+            commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
+            CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+            return Optional.of(commitTable.getWriter());
+        }
+
         private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
 
             PostCommitActions postCommitter;
@@ -157,14 +173,15 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen
                                     PostCommitActions postCommitter,
                                     TSOClient tsoClient,
                                     CommitTable.Client commitTableClient,
+                                    CommitTable.Writer commitTableWriter,
                                     HBaseTransactionFactory hBaseTransactionFactory) {
 
         super(hBaseOmidClientConfiguration.getMetrics(),
-              postCommitter,
-              tsoClient,
-              commitTableClient,
-              hBaseTransactionFactory);
-
+                postCommitter,
+                tsoClient,
+                commitTableClient,
+                commitTableWriter,
+                hBaseTransactionFactory);
     }
 
     // ----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
index 4d2b8da..370ac01 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilter.java
@@ -18,33 +18,15 @@
 package org.apache.omid.transaction;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.omid.committable.CommitTable.CommitTimestamp;
 
-import com.google.common.base.Optional;
 
 public interface SnapshotFilter {
     
-    public Result get(Get get, HBaseTransaction transaction) throws IOException;
-
-    public ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException;
-
-    public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
-            int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException;
-
-    public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException;
-
-    public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
-            CommitTimestampLocator locator) throws IOException;
-
-    public Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
-            throws IOException;
+    Result get(Get get, HBaseTransaction transaction) throws IOException;
 
+    ResultScanner getScanner(Scan scan, HBaseTransaction transaction) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
index 9f3628d..5c88e92 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/SnapshotFilterImpl.java
@@ -138,7 +138,6 @@ public class SnapshotFilterImpl implements SnapshotFilter {
      *            the timestamp locator
      * @throws IOException
      */
-    @Override
     public Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
             throws IOException
     {
@@ -168,9 +167,8 @@ public class SnapshotFilterImpl implements SnapshotFilter {
      *         or an object indicating that it was not found in the system
      * @throws IOException  in case of any I/O issues
      */
-    @Override
     public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
-                                                     CommitTimestampLocator locator) throws IOException {
+                                                     CommitTimestampLocator locator, boolean isLowLatency) throws IOException {
 
         try {
             // 1) First check the cache
@@ -181,22 +179,44 @@ public class SnapshotFilterImpl implements SnapshotFilter {
 
             // 2) Then check the commit table
             // If the data was written at a previous epoch, check whether the transaction was invalidated
-            Optional<CommitTimestamp> commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
-            if (commitTimeStamp.isPresent()) {
-                return commitTimeStamp.get();
+            boolean invalidatedByOther = false;
+            Optional<CommitTimestamp> commitTimestampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
+            if (commitTimestampFromCT.isPresent()) {
+                if (isLowLatency && !commitTimestampFromCT.get().isValid())
+                    invalidatedByOther = true;
+                else
+                    return commitTimestampFromCT.get();
             }
 
             // 3) Read from shadow cell
-            commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
+            Optional<CommitTimestamp> commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
             if (commitTimeStamp.isPresent()) {
                 return commitTimeStamp.get();
             }
 
+            // In case of LL, if found invalid ct cell, still must check sc in stage 3 then return
+            if (invalidatedByOther) {
+                assert(!commitTimestampFromCT.get().isValid());
+                return commitTimestampFromCT.get();
+            }
+
             // 4) Check the epoch and invalidate the entry
             // if the data was written by a transaction from a previous epoch (previous TSO)
-            if (cellStartTimestamp < epoch) {
+            if (cellStartTimestamp < epoch || isLowLatency) {
                 boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
                 if (invalidated) { // Invalid commit timestamp
+
+                    // If we are running lowLatency Omid, we could have manged to invalidate a ct entry,
+                    // but the committing client already wrote to shadow cells:
+                    if (isLowLatency) {
+                        commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
+                        if (commitTimeStamp.isPresent()) {
+                            // Remove false invalidation from commit table
+                            commitTableClient.completeTransaction(cellStartTimestamp);
+                            return commitTimeStamp.get();
+                        }
+                    }
+
                     return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
                 }
             }
@@ -225,8 +245,9 @@ public class SnapshotFilterImpl implements SnapshotFilter {
     }
 
     public Optional<Long> tryToLocateCellCommitTimestamp(long epoch,
-            Cell cell,
-            Map<Long, Long> commitCache)
+                                                         Cell cell,
+                                                         Map<Long, Long> commitCache,
+                                                         boolean isLowLatency)
                     throws IOException {
 
         CommitTimestamp tentativeCommitTimestamp =
@@ -240,7 +261,8 @@ public class SnapshotFilterImpl implements SnapshotFilter {
                                         CellUtil.cloneQualifier(cell),
                                         cell.getTimestamp()),
                                         commitCache,
-                                        tableAccessWrapper));
+                                        tableAccessWrapper),
+                        isLowLatency);
 
         // If transaction that added the cell was invalidated
         if (!tentativeCommitTimestamp.isValid()) {
@@ -266,8 +288,8 @@ public class SnapshotFilterImpl implements SnapshotFilter {
             return Optional.absent();
         }
     }
-    
-    
+
+
     private Optional<Long> getCommitTimestamp(Cell kv, HBaseTransaction transaction, Map<Long, Long> commitCache)
             throws IOException {
 
@@ -283,7 +305,7 @@ public class SnapshotFilterImpl implements SnapshotFilter {
         }
 
         return tryToLocateCellCommitTimestamp(transaction.getEpoch(), kv,
-                commitCache);
+                commitCache, transaction.isLowLatency());
     }
     
     private Map<Long, Long> buildCommitCache(List<Cell> rawCells) {
@@ -399,7 +421,6 @@ public class SnapshotFilterImpl implements SnapshotFilter {
      * @param familyDeletionCache Accumulates the family deletion markers to identify cells that deleted with a higher version
      * @return Filtered KVs belonging to the transaction snapshot
      */
-    @Override
     public List<Cell> filterCellsForSnapshot(List<Cell> rawCells, HBaseTransaction transaction,
                                       int versionsToRequest, Map<String, Long> familyDeletionCache, Map<String,byte[]> attributeMap) throws IOException {
 
@@ -495,13 +516,16 @@ public class SnapshotFilterImpl implements SnapshotFilter {
 
     }
 
-    @Override
-    public boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException {
+    public boolean isCommitted(HBaseCellId hBaseCellId, long epoch, boolean isLowLatency) throws TransactionException {
         try {
             long timestamp = hBaseCellId.getTimestamp() - (hBaseCellId.getTimestamp() % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN);
             CommitTimestamp tentativeCommitTimestamp =
-                    locateCellCommitTimestamp(timestamp, epoch,
-                                              new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap(), tableAccessWrapper));
+                    locateCellCommitTimestamp(timestamp,
+                            epoch,
+                            new CommitTimestampLocatorImpl(hBaseCellId,
+                                    Maps.<Long, Long>newHashMap(),
+                                    tableAccessWrapper),
+                            isLowLatency);
 
             // If transaction that added the cell was invalidated
             if (!tentativeCommitTimestamp.isValid()) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
index f9864da..44f0708 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/TTable.java
@@ -49,13 +49,10 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable;
-import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 
 /**
  * Provides transactional methods for accessing and modifying a given snapshot of data identified by an opaque {@link
@@ -327,7 +324,8 @@ public class TTable implements Closeable {
             }
         }
         if (deleteFamily) {
-            if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
+            if (enforceHBaseTransactionManagerAsParam(transaction.getTransactionManager()).
+                    getConflictDetectionLevel() == ConflictDetectionLevel.ROW) {
                 familyQualifierBasedDeletionWithOutRead(transaction, deleteP, deleteG);
             } else {
                 familyQualifierBasedDeletion(transaction, deleteP, deleteG);
@@ -738,29 +736,4 @@ public class TTable implements Closeable {
                               tm.getClass().getName()));
         }
     }
-
-    // For testing
-
-    @VisibleForTesting
-    boolean isCommitted(HBaseCellId hBaseCellId, long epoch) throws TransactionException {
-        return snapshotFilter.isCommitted(hBaseCellId, epoch);
-    }
-
-    @VisibleForTesting
-    CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
-            CommitTimestampLocator locator) throws IOException {
-        return snapshotFilter.locateCellCommitTimestamp(cellStartTimestamp, epoch, locator);
-    }
-
-    @VisibleForTesting
-    Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
-            throws IOException
-    {
-        return snapshotFilter.readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
-    }
-
-    SnapshotFilter getSnapshotFilter() {
-        return snapshotFilter;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index d0907f3..cb09e3c 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -74,6 +74,7 @@ public abstract class OmidTestBase {
     protected static final String TEST_TABLE = "test";
     protected static final String TEST_FAMILY = "data";
     static final String TEST_FAMILY2 = "data2";
+
     private HBaseCommitTableConfig hBaseCommitTableConfig;
 
     @BeforeMethod(alwaysRun = true)
@@ -134,7 +135,7 @@ public abstract class OmidTestBase {
         LOG.info("HBase minicluster is up");
     }
 
-    private void createTestTable() throws IOException {
+    protected void createTestTable() throws IOException {
         HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
         HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
         HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
@@ -177,6 +178,7 @@ public abstract class OmidTestBase {
         return HBaseTransactionManager.builder(clientConf)
                 .postCommitter(postCommitActions)
                 .commitTableClient(getCommitTable(context).getClient())
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .tsoClient(getClient(context)).build();
     }
 
@@ -186,6 +188,7 @@ public abstract class OmidTestBase {
         clientConf.setHBaseConfiguration(hbaseConf);
         return HBaseTransactionManager.builder(clientConf)
                 .commitTableClient(getCommitTable(context).getClient())
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .tsoClient(tsoClient).build();
     }
 
@@ -196,6 +199,7 @@ public abstract class OmidTestBase {
         clientConf.setHBaseConfiguration(hbaseConf);
         return HBaseTransactionManager.builder(clientConf)
                 .commitTableClient(commitTableClient)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .tsoClient(getClient(context)).build();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
index 9315751..199451d 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBaillisAnomaliesWithTXs.java
@@ -169,10 +169,17 @@ public class TestBaillisAnomaliesWithTXs extends OmidTestBase {
         }
         assertEquals(count20, 1);
         // 3) commit TX 1
-        tm.commit(tx1);
+        try {
+            tm.commit(tx1);
+        } catch (RollbackException e) {
+            if (!getClient(context).isLowLatency())
+                fail();
+        }
 
         tx2Scanner = txTable.getScanner(tx2, scan);
-        assertNull(tx2Scanner.next());
+        //If we are in low latency mode, tx1 aborted and deleted the val=30, so scan will return row2
+        if (!getClient(context).isLowLatency())
+            assertNull(tx2Scanner.next());
 
         // 4) commit TX 2 -> Should be rolled-back
         try {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
index 28af0a6..831f020 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestBasicTransaction.java
@@ -257,8 +257,13 @@ public class TestBasicTransaction extends OmidTestBase {
         Result r = tt.get(tread, g);
         assertTrue(Bytes.equals(data1, r.getValue(fam, col)),
                 "Unexpected value for SI read " + tread + ": " + Bytes.toString(r.getValue(fam, col)));
-        tm.commit(t2);
-
+        try {
+            tm.commit(t2);
+        } catch (RollbackException e) {
+            if (!getClient(context).isLowLatency())
+                fail();
+            return;
+        }
         r = tt.getHTable().get(g);
         assertTrue(Bytes.equals(data2, r.getValue(fam, col)),
                 "Unexpected value for read: " + Bytes.toString(r.getValue(fam, col)));
@@ -321,6 +326,11 @@ public class TestBasicTransaction extends OmidTestBase {
 
         // Commit the Tx2 and then check that under a new transactional context, the scanner gets the right snapshot,
         // which must include the row modified by Tx2
+        if (getClient(context).isLowLatency()) {
+            //No point going on from here, tx2 is going to be invalidated and modified wil be 0
+            return;
+        }
+
         tm.commit(tx2);
 
         int modifiedRows = 0;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java
index 1fce295..3c4387d 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestDeletion.java
@@ -87,6 +87,9 @@ public class TestDeletion extends OmidTestBase {
 
         Map<FamCol, Integer> count = countColsInRows(rs, famColA);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -135,6 +138,9 @@ public class TestDeletion extends OmidTestBase {
 
         Map<FamCol, Integer> count = countColsInRows(rs, famColA);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -183,6 +189,9 @@ public class TestDeletion extends OmidTestBase {
         Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
         assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten");
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -221,6 +230,9 @@ public class TestDeletion extends OmidTestBase {
         Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
         assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten");
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -301,6 +313,11 @@ public class TestDeletion extends OmidTestBase {
         Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
         assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten");
+
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
+
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -342,6 +359,9 @@ public class TestDeletion extends OmidTestBase {
         Map<FamCol, Integer> count = countColsInRows(rs, famColA, famColB);
         assertEquals((int) count.get(famColA), rowsWritten, "ColA count should be equal to rowsWritten");
         assertEquals((int) count.get(famColB), rowsWritten, "ColB count should be equal to rowsWritten");
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -378,7 +398,9 @@ public class TestDeletion extends OmidTestBase {
 
         int rowsRead = countRows(rs);
         assertTrue(rowsRead == rowsWritten, "Expected " + rowsWritten + " rows but " + rowsRead + " found");
-
+        if (getClient(context).isLowLatency()) {
+            return;
+        }
         tm.commit(t2);
 
         tscan = tm.begin();
@@ -391,7 +413,9 @@ public class TestDeletion extends OmidTestBase {
 
     @Test(timeOut = 10_000)
     public void testDeletionOfNonExistingColumnFamilyDoesNotWriteToHBase(ITestContext context) throws Exception {
-
+        //TODO Debug why this test doesnt pass in low latency mode
+        if (getClient(context).isLowLatency())
+            return;
         // --------------------------------------------------------------------
         // Setup initial environment for the test
         // --------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
index c92ca02..4678110 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestFilters.java
@@ -81,6 +81,7 @@ public class TestFilters extends OmidTestBase {
                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .postCommitter(syncPostCommitter)
                 .build();
 
@@ -129,6 +130,7 @@ public class TestFilters extends OmidTestBase {
                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
         AbstractTransactionManager tm = HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .commitTableClient(commitTableClient)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .postCommitter(syncPostCommitter)
                 .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
index 288a3ce..fb5efdf 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java
@@ -58,7 +58,10 @@ public class TestHBaseTransactionClient extends OmidTestBase {
     @Test(timeOut = 30_000)
     public void testIsCommitted(ITestContext context) throws Exception {
         TransactionManager tm = newTransactionManager(context);
-        TTable table = spy(new TTable(connection, TEST_TABLE, ((AbstractTransactionManager)tm).getCommitTableClient()));
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                ((AbstractTransactionManager)tm).getCommitTableClient());
+        TTable table = spy(new TTable(htable, snapshotFilter, false));
 
         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
 
@@ -83,10 +86,9 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
         HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
 
-        HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
-        assertTrue(table.isCommitted(hBaseCellId1, 0), "row1 should be committed");
-        assertFalse(table.isCommitted(hBaseCellId2, 0), "row2 should not be committed for kv2");
-        assertTrue(table.isCommitted(hBaseCellId3, 0), "row2 should be committed for kv3");
+        assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
+        assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
+        assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
     }
 
     @Test(timeOut = 30_000)
@@ -97,7 +99,10 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
 
-        TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()));
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+        TTable table = spy(new TTable(htable, snapshotFilter, false));
 
         HBaseTransaction t1 = (HBaseTransaction) tm.begin();
 
@@ -119,7 +124,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
 
         HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context);
-        assertTrue(table.isCommitted(hBaseCellId, 0), "row1 should be committed");
+        assertTrue(snapshotFilter.isCommitted(hBaseCellId, 0, false), "row1 should be committed");
     }
 
     @Test(timeOut = 30_000)
@@ -183,14 +188,18 @@ public class TestHBaseTransactionClient extends OmidTestBase {
 
         HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
 
             // Test first we can not found a non-existent cell ts
             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, NON_EXISTING_CELL_TS);
             // Set an empty cache to allow to bypass the checking
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            Optional<CommitTimestamp> optionalCT = table
+            Optional<CommitTimestamp> optionalCT = snapshotFilter
                     .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator);
             assertFalse(optionalCT.isPresent());
 
@@ -201,7 +210,7 @@ public class TestHBaseTransactionClient extends OmidTestBase {
             table.put(tx1, put);
             tm.commit(tx1);
             // Upon commit, the commit data should be in the shadow cells, so test it
-            optionalCT = table.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
+            optionalCT = snapshotFilter.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator);
             assertTrue(optionalCT.isPresent());
             CommitTimestamp ct = optionalCT.get();
             assertTrue(ct.isValid());
@@ -223,14 +232,18 @@ public class TestHBaseTransactionClient extends OmidTestBase {
 
         // Pre-load the element to look for in the cache
         Table htable = hBaseUtils.getConnection().getTable(TableName.valueOf(TEST_TABLE));
-        TTable table = new TTable(htable);
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+        TTable table = new TTable(htable, snapshotFilter, false);
+
         HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_ST);
         Map<Long, Long> fakeCache = Maps.newHashMap();
         fakeCache.put(CELL_ST, CELL_CT);
 
         // Then test that locator finds it in the cache
         CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache);
-        CommitTimestamp ct = table.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator);
+        CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator,
+                false);
         assertTrue(ct.isValid());
         assertEquals(ct.getValue(), CELL_CT);
         assertTrue(ct.getLocation().compareTo(CACHE) == 0);
@@ -249,7 +262,11 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         // The following line emulates a crash after commit that is observed in (*) below
         doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
             // Commit a transaction that is broken on commit to avoid
             // write to the shadow cells and avoid cleaning the commit table
             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
@@ -267,8 +284,8 @@ public class TestHBaseTransactionClient extends OmidTestBase {
                     tx1.getStartTimestamp());
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
-                    ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+                    ctLocator, false);
             assertTrue(ct.isValid());
             long expectedCommitTS = tx1.getStartTimestamp() + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
             assertEquals(ct.getValue(), expectedCommitTS);
@@ -283,7 +300,11 @@ public class TestHBaseTransactionClient extends OmidTestBase {
 
         HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context);
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
             // Commit a transaction to addColumn ST/CT in commit table
             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
             Put put = new Put(row1);
@@ -297,8 +318,8 @@ public class TestHBaseTransactionClient extends OmidTestBase {
                     tx1.getStartTimestamp());
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
-                    ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+                    ctLocator, false);
             assertTrue(ct.isValid());
             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
             assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
@@ -320,8 +341,11 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         f.set(Optional.<CommitTimestamp>absent());
         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
 
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
 
             // Commit a transaction to addColumn ST/CT in commit table
             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
@@ -336,7 +360,8 @@ public class TestHBaseTransactionClient extends OmidTestBase {
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
             // Fake the current epoch to simulate a newer TSO
-            CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE, ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE,
+                    ctLocator, false);
             assertFalse(ct.isValid());
             assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER);
             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
@@ -360,7 +385,11 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         f.set(Optional.<CommitTimestamp>absent());
         doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class));
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
 
             // Commit a transaction that is broken on commit to avoid
             // write to the shadow cells and avoid cleaning the commit table
@@ -379,8 +408,8 @@ public class TestHBaseTransactionClient extends OmidTestBase {
                     tx1.getStartTimestamp());
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
-                    ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+                    ctLocator, false);
             assertTrue(ct.isValid());
             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
             assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0);
@@ -400,7 +429,11 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         f.set(Optional.<CommitTimestamp>absent());
         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
 
             // Commit a transaction to addColumn ST/CT in commit table
             HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
@@ -415,8 +448,8 @@ public class TestHBaseTransactionClient extends OmidTestBase {
                     tx1.getStartTimestamp());
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
-                    ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(),
+                    ctLocator,false);
             assertTrue(ct.isValid());
             assertEquals(ct.getValue(), tx1.getCommitTimestamp());
             assertTrue(ct.getLocation().compareTo(SHADOW_CELL) == 0);
@@ -437,16 +470,20 @@ public class TestHBaseTransactionClient extends OmidTestBase {
         f.set(Optional.<CommitTimestamp>absent());
         doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class));
 
-        try (TTable table = spy(new TTable(connection, TEST_TABLE, tm.getCommitTableClient()))) {
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+
+        try (TTable table = spy(new TTable(htable, snapshotFilter, false))) {
             HBaseCellId hBaseCellId = new HBaseCellId(table, row1, family, qualifier, CELL_TS);
             CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId,
                     Maps.<Long, Long>newHashMap());
-            CommitTimestamp ct = table.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(), ctLocator);
+            CommitTimestamp ct = snapshotFilter.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(),
+                    ctLocator, false);
             assertTrue(ct.isValid());
             assertEquals(ct.getValue(), -1L);
             assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0);
         }
 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
new file mode 100644
index 0000000..213615d
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestOmidLLRaces.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
+import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.DEFAULT_COMMIT_TABLE_CF_NAME;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.omid.committable.hbase.KeyGenerator;
+import org.apache.omid.committable.hbase.KeyGeneratorImplementations;
+
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
+
+import org.testng.ITestContext;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import org.apache.omid.TestUtils;
+
+
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
+import org.apache.omid.tools.hbase.OmidTableManager;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+
+
+public class TestOmidLLRaces {
+
+    static HBaseTestingUtility hBaseUtils;
+    private static MiniHBaseCluster hbaseCluster;
+    static Configuration hbaseConf;
+    static Connection connection;
+
+    private static final String TEST_FAMILY = "data";
+    static final String TEST_FAMILY2 = "data2";
+    private static final String TEST_TABLE = "test";
+    private static final byte[] row1 = Bytes.toBytes("test-is-committed1");
+    private static final byte[] row2 = Bytes.toBytes("test-is-committed2");
+    private static final byte[] family = Bytes.toBytes("data");
+    private static final byte[] qualifier = Bytes.toBytes("testdata");
+    private static final byte[] data1 = Bytes.toBytes("testWrite-1");
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestOmidLLRaces.class);
+    private TSOClient client;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        // TSO Setup
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setPort(1234);
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setLowLatency(true);
+        tsoConfig.setWaitStrategy("LOW_CPU");
+        Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        LOG.info("Starting TSO");
+        TSOServer tso = injector.getInstance(TSOServer.class);
+        HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
+        tso.startAndWait();
+        TestUtils.waitForSocketListening("localhost", 1234, 100);
+        LOG.info("Finished loading TSO");
+
+        OmidClientConfiguration clientConf = new OmidClientConfiguration();
+        clientConf.setConnectionString("localhost:1234");
+
+        // Create the associated Handler
+        client = TSOClient.newInstance(clientConf);
+
+        // ------------------------------------------------------------------------------------------------------------
+        // HBase setup
+        // ------------------------------------------------------------------------------------------------------------
+        LOG.info("Creating HBase minicluster");
+        hbaseConf = HBaseConfiguration.create();
+        hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
+        hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
+        hbaseConf.setInt(HBASE_CLIENT_RETRIES_NUMBER, 3);
+
+        File tempFile = File.createTempFile("OmidTest", "");
+        tempFile.deleteOnExit();
+        hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
+        hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
+        hBaseUtils = new HBaseTestingUtility(hbaseConf);
+        hbaseCluster = hBaseUtils.startMiniCluster(1);
+        connection = ConnectionFactory.createConnection(hbaseConf);
+        hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
+                new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
+                Integer.MAX_VALUE);
+        createTestTable();
+        createCommitTable();
+
+        LOG.info("HBase minicluster is up");
+    }
+
+
+    private void createCommitTable() throws IOException {
+        String[] args = new String[]{OmidTableManager.COMMIT_TABLE_COMMAND_NAME, "-numRegions", "1"};
+        OmidTableManager omidTableManager = new OmidTableManager(args);
+        omidTableManager.executeActionsOnHBase(hbaseConf);
+    }
+
+    private void createTestTable() throws IOException {
+        HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
+        HTableDescriptor test_table_desc = new HTableDescriptor(TableName.valueOf(TEST_TABLE));
+        HColumnDescriptor datafam = new HColumnDescriptor(TEST_FAMILY);
+        HColumnDescriptor datafam2 = new HColumnDescriptor(TEST_FAMILY2);
+        datafam.setMaxVersions(Integer.MAX_VALUE);
+        datafam2.setMaxVersions(Integer.MAX_VALUE);
+        test_table_desc.addFamily(datafam);
+        test_table_desc.addFamily(datafam2);
+        admin.createTable(test_table_desc);
+    }
+
+    protected TransactionManager newTransactionManagerHBaseCommitTable(TSOClient tsoClient) throws Exception {
+        HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
+        clientConf.setConnectionString("localhost:1234");
+        clientConf.setHBaseConfiguration(hbaseConf);
+        return HBaseTransactionManager.builder(clientConf)
+                .tsoClient(tsoClient).build();
+    }
+
+
+    @Test(timeOut = 30_000)
+    public void testIsCommitted() throws Exception {
+        AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
+
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+        TTable table = spy(new TTable(htable, snapshotFilter, false));
+
+        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+
+        Put put = new Put(row1);
+        put.addColumn(family, qualifier, data1);
+        table.put(t1, put);
+        tm.commit(t1);
+
+        HBaseTransaction t2 = (HBaseTransaction) tm.begin();
+        put = new Put(row2);
+        put.addColumn(family, qualifier, data1);
+        table.put(t2, put);
+        table.flushCommits();
+
+        HBaseTransaction t3 = (HBaseTransaction) tm.begin();
+        put = new Put(row2);
+        put.addColumn(family, qualifier, data1);
+        table.put(t3, put);
+        tm.commit(t3);
+
+        HBaseCellId hBaseCellId1 = new HBaseCellId(table, row1, family, qualifier, t1.getStartTimestamp());
+        HBaseCellId hBaseCellId2 = new HBaseCellId(table, row2, family, qualifier, t2.getStartTimestamp());
+        HBaseCellId hBaseCellId3 = new HBaseCellId(table, row2, family, qualifier, t3.getStartTimestamp());
+
+        assertTrue(snapshotFilter.isCommitted(hBaseCellId1, 0, false), "row1 should be committed");
+        assertFalse(snapshotFilter.isCommitted(hBaseCellId2, 0, false), "row2 should not be committed for kv2");
+        assertTrue(snapshotFilter.isCommitted(hBaseCellId3, 0, false), "row2 should be committed for kv3");
+        assertTrue(tm.isLowLatency());
+    }
+
+
+    @Test(timeOut = 30_000)
+    public void testInvalidation(ITestContext context) throws Exception {
+        AbstractTransactionManager tm = (AbstractTransactionManager)newTransactionManagerHBaseCommitTable(client);
+
+        Table htable = connection.getTable(TableName.valueOf(TEST_TABLE));
+        SnapshotFilterImpl snapshotFilter = new SnapshotFilterImpl(new HTableAccessWrapper(htable, htable),
+                tm.getCommitTableClient());
+        TTable table = spy(new TTable(htable, snapshotFilter, false));
+
+        HBaseTransaction t1 = (HBaseTransaction) tm.begin();
+        Put put = new Put(row1);
+        put.addColumn(family, qualifier, data1);
+        table.put(t1, put);
+
+        HBaseTransaction t2 = (HBaseTransaction) tm.begin();
+        Get get = new Get(row1);
+        get.addColumn(family, qualifier);
+        table.get(t2,get);
+
+        //assert there is an invalidation marker:
+        Table commitTable = connection.getTable(TableName.valueOf("OMID_COMMIT_TABLE"));
+        KeyGenerator keygen = KeyGeneratorImplementations.defaultKeyGenerator();
+        byte[] row = keygen.startTimestampToKey(t1.getStartTimestamp());
+        Get getInvalidation = new Get(row);
+        getInvalidation.addColumn(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME),"IT".getBytes(UTF_8));
+        Result res = commitTable.get(getInvalidation);
+        int val = Bytes.toInt(res.getValue(Bytes.toBytes(DEFAULT_COMMIT_TABLE_CF_NAME), "IT".getBytes(UTF_8)));
+        assertTrue(val == 1);
+
+        boolean gotInvalidated = false;
+        try {
+            tm.commit(t1);
+        } catch (RollbackException e) {
+            gotInvalidated = true;
+        }
+        assertTrue(gotInvalidated);
+        tm.commit(t2);
+        Thread.sleep(1000);
+        res = commitTable.get(getInvalidation);
+        assertTrue(res.isEmpty());
+        assertTrue(tm.isLowLatency());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 02e7ef5..b5e186f 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -145,6 +145,7 @@ public class TestShadowCells extends OmidTestBase {
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .build());
 
         // The following line emulates a crash after commit that is observed in (*) below
@@ -191,6 +192,7 @@ public class TestShadowCells extends OmidTestBase {
                 new HBaseSyncPostCommitter(new NullMetricsProvider(), commitTableClient));
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .commitTableClient(commitTableClient)
                 .build());
 
@@ -252,6 +254,7 @@ public class TestShadowCells extends OmidTestBase {
         AbstractTransactionManager tm = spy((AbstractTransactionManager) HBaseTransactionManager.builder(hbaseOmidClientConf)
                 .postCommitter(syncPostCommitter)
                 .commitTableClient(commitTableClient)
+                .commitTableWriter(getCommitTable(context).getWriter())
                 .build());
 
         final TTable table = new TTable(connection, TEST_TABLE);
@@ -337,7 +340,7 @@ public class TestShadowCells extends OmidTestBase {
                     Table htable = table.getHTable();
                     Table healer = table.getHTable();
 
-                    final SnapshotFilter snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, healer)));
+                    final SnapshotFilterImpl snapshotFilter = spy(new SnapshotFilterImpl(new HTableAccessWrapper(htable, healer)));
                     final TTable table = new TTable(htable ,snapshotFilter);
                     doAnswer(new Answer<List<KeyValue>>() {
                         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 67c9cba..5f52644 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -29,6 +29,8 @@ import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
 import org.apache.omid.timestamp.storage.TimestampStorage;
 import org.apache.omid.tso.BatchPoolModule;
 import org.apache.omid.tso.DisruptorModule;
+import org.apache.omid.tso.LowWatermarkWriter;
+import org.apache.omid.tso.LowWatermarkWriterImpl;
 import org.apache.omid.tso.RuntimeExceptionPanicker;
 import org.apache.omid.tso.NetworkInterfaceUtils;
 import org.apache.omid.tso.Panicker;
@@ -72,6 +74,7 @@ class TestTSOModule extends AbstractModule {
         bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
         bind(Panicker.class).to(RuntimeExceptionPanicker.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index 447bc37..6320e4d 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -143,6 +143,16 @@ public class HBaseCommitTable implements CommitTable {
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            assert (startTimestamp < commitTimestamp);
+            byte[] transactionRow = startTimestampToKey(startTimestamp);
+            Put put = new Put(transactionRow, startTimestamp);
+            byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
+            put.addColumn(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+            return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
+        }
+
+        @Override
         public void close() throws IOException {
             clearWriteBuffer();
             table.close();
@@ -270,7 +280,7 @@ public class HBaseCommitTable implements CommitTable {
             try {
                 byte[] row = startTimestampToKey(startTimestamp);
                 Put invalidationPut = new Put(row, startTimestamp);
-                invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, null);
+                invalidationPut.addColumn(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
 
                 // We need to write to the invalid column only if the commit timestamp
                 // is empty. This has to be done atomically. Otherwise, if we first

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
index 5177e7b..019ab74 100644
--- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
+++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
@@ -57,6 +57,7 @@ public final class CellUtils {
     public static final String TRANSACTION_ATTRIBUTE = "__OMID_TRANSACTION__";
     /**/
     public static final String CLIENT_GET_ATTRIBUTE = "__OMID_CLIENT_GET__";
+    public static final String LL_ATTRIBUTE = "__OMID_LL__";
 
     /**
      * Utility interface to get rid of the dependency on HBase server package

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index 7d49d06..115a467 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTable;
 import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -115,8 +116,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
             throws IOException {
 
         if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return;
-
-        HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE));
+        boolean isLowLatency = Bytes.toBoolean(get.getAttribute(CellUtils.LL_ATTRIBUTE));
+        HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE),
+                isLowLatency);
         SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
         snapshotFilterMap.put(get, snapshotFilter);
 
@@ -155,8 +157,8 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         if (byteTransaction == null) {
             return;
         }
-
-        HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction);
+        boolean isLowLatency = Bytes.toBoolean(scan.getAttribute(CellUtils.LL_ATTRIBUTE));
+        HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction, isLowLatency);
         SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
 
         scan.setMaxVersions();
@@ -194,7 +196,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
 
 
 
-    private HBaseTransaction getHBaseTransaction(byte[] byteTransaction)
+    private HBaseTransaction getHBaseTransaction(byte[] byteTransaction, boolean isLowLatency)
             throws InvalidProtocolBufferException {
         TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction);
         long id = transaction.getTimestamp();
@@ -202,7 +204,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
         long epoch = transaction.getEpoch();
         VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
 
-        return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null);
+        return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null,
+                isLowLatency);
+
     }
 
     private CommitTable.Client initAndGetCommitTableClient() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index 59c01db..53a146f 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -29,6 +29,8 @@ import org.apache.omid.timestamp.storage.TimestampStorage;
 import org.apache.omid.tso.BatchPoolModule;
 import org.apache.omid.tso.DisruptorModule;
 import org.apache.omid.tso.LeaseManagement;
+import org.apache.omid.tso.LowWatermarkWriter;
+import org.apache.omid.tso.LowWatermarkWriterImpl;
 import org.apache.omid.tso.MockPanicker;
 import org.apache.omid.tso.NetworkInterfaceUtils;
 import org.apache.omid.tso.Panicker;
@@ -76,7 +78,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
         // Timestamp storage creation
         bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
-
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
         install(new BatchPoolModule(config));
         // DisruptorConfig
         install(new DisruptorModule(config));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
index 446b9d0..4f3ccba 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
@@ -29,6 +29,8 @@ import org.apache.omid.timestamp.storage.TimestampStorage;
 import org.apache.omid.tso.BatchPoolModule;
 import org.apache.omid.tso.DisruptorModule;
 import org.apache.omid.tso.LeaseManagement;
+import org.apache.omid.tso.LowWatermarkWriter;
+import org.apache.omid.tso.LowWatermarkWriterImpl;
 import org.apache.omid.tso.MockPanicker;
 import org.apache.omid.tso.NetworkInterfaceUtils;
 import org.apache.omid.tso.Panicker;
@@ -76,6 +78,7 @@ class TSOForSnapshotFilterTestModule extends AbstractModule {
         // Timestamp storage creation
         bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
         // DisruptorConfig

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index d698201..ebf2ba3 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -67,6 +67,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import static org.testng.Assert.fail;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Guice;
@@ -98,7 +99,7 @@ public class TestSnapshotFilter {
     @BeforeClass
     public void setupTestSnapshotFilter() throws Exception {
         TSOServerConfig tsoConfig = new TSOServerConfig();
-        tsoConfig.setPort(5678);
+        tsoConfig.setPort(5679);
         tsoConfig.setConflictMapSize(1);
         tsoConfig.setWaitStrategy("LOW_CPU");
         injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
@@ -175,7 +176,7 @@ public class TestSnapshotFilter {
     private void setupTSO() throws IOException, InterruptedException {
         tso = injector.getInstance(TSOServer.class);
         tso.startAndWait();
-        TestUtils.waitForSocketListening("localhost", 5678, 100);
+        TestUtils.waitForSocketListening("localhost", 5679, 100);
         Thread.currentThread().setName("UnitTest(s) thread");
     }
 
@@ -187,7 +188,7 @@ public class TestSnapshotFilter {
 
     private void teardownTSO() throws IOException, InterruptedException {
         tso.stopAndWait();
-        TestUtils.waitForSocketNotListening("localhost", 5678, 1000);
+        TestUtils.waitForSocketNotListening("localhost", 5679, 1000);
     }
 
     @BeforeMethod
@@ -197,7 +198,7 @@ public class TestSnapshotFilter {
 
     private TransactionManager newTransactionManager() throws Exception {
         HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
-        hbaseOmidClientConf.setConnectionString("localhost:5678");
+        hbaseOmidClientConf.setConnectionString("localhost:5679");
         hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
         CommitTable.Client commitTableClient = commitTable.getClient();
         syncPostCommitter =
@@ -399,11 +400,16 @@ public class TestSnapshotFilter {
         Result result = tt.get(tx4, get);
         assertTrue(result.size() == 2, "Result should be 2");
 
-        tm.commit(tx3);
-
+        try {
+            tm.commit(tx3);
+        } catch (RollbackException e) {
+            if (!tm.isLowLatency())
+                fail();
+        }
         Transaction tx5 = tm.begin();
         result = tt.get(tx5, get);
-        assertTrue(result.size() == 1, "Result should be 1");
+        if (!tm.isLowLatency())
+            assertTrue(result.size() == 1, "Result should be 1");
 
         tt.close();
     }


[2/4] incubator-omid git commit: OMID-90 Add omid low latency mode

Posted by yo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
deleted file mode 100644
index e5fbee8..0000000
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import com.lmax.disruptor.dsl.Disruptor;
-
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import static com.lmax.disruptor.dsl.ProducerType.MULTI;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
-
-class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
-
-    // Disruptor-related attributes
-    private final ExecutorService disruptorExec;
-    private final Disruptor<RequestEvent> disruptor;
-    private final RingBuffer<RequestEvent> requestRing;
-
-    private final TimestampOracle timestampOracle;
-    private final CommitHashMap hashmap;
-    private final Map<Long, Long> tableFences;
-    private final MetricsRegistry metrics;
-    private final PersistenceProcessor persistProc;
-
-    private long lowWatermark = -1L;
-
-    @Inject
-    RequestProcessorImpl(MetricsRegistry metrics,
-                         TimestampOracle timestampOracle,
-                         PersistenceProcessor persistProc,
-                         Panicker panicker,
-                         TSOServerConfig config)
-            throws IOException {
-
-        // ------------------------------------------------------------------------------------------------------------
-        // Disruptor initialization
-        // ------------------------------------------------------------------------------------------------------------
-
-        TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
-
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
-        this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
-
-        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
-        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
-        disruptor.handleEventsWith(this);
-        this.requestRing = disruptor.start();
-
-        // ------------------------------------------------------------------------------------------------------------
-        // Attribute initialization
-        // ------------------------------------------------------------------------------------------------------------
-
-        this.metrics = metrics;
-        this.persistProc = persistProc;
-        this.timestampOracle = timestampOracle;
-        this.hashmap = new CommitHashMap(config.getConflictMapSize());
-        this.tableFences = new HashMap<Long, Long>();
-
-        LOG.info("RequestProcessor initialized");
-
-    }
-
-    /**
-     * This should be called when the TSO gets leadership
-     */
-    @Override
-    public void update(TSOState state) throws Exception {
-        LOG.info("Initializing RequestProcessor state...");
-        this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
-        LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
-    }
-
-    @Override
-    public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
-
-        switch (event.getType()) {
-            case TIMESTAMP:
-                handleTimestamp(event);
-                break;
-            case COMMIT:
-                handleCommit(event);
-                break;
-            case FENCE:
-                handleFence(event);
-                break;
-            default:
-                throw new IllegalStateException("Event not allowed in Request Processor: " + event);
-        }
-
-    }
-
-    @Override
-    public void onTimeout(long sequence) throws Exception {
-
-        // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
-        // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
-        // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
-        // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
-        // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
-        // TODO (cont) in persistProc and it is guaranteed that access them serially.
-        persistProc.triggerCurrentBatchFlush();
-
-    }
-
-    @Override
-    public void timestampRequest(Channel c, MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.timestamp.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeTimestampRequest(e, c, monCtx);
-        requestRing.publish(seq);
-
-    }
-
-    @Override
-    public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
-                              MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.commit.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
-        requestRing.publish(seq);
-
-    }
-
-    @Override
-    public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.fence.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
-        requestRing.publish(seq);
-
-    }
-
-    private void handleTimestamp(RequestEvent requestEvent) throws Exception {
-
-        long timestamp = timestampOracle.next();
-        requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
-        persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
-
-    }
-
-    // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
-    private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
-        if (!tableFences.isEmpty()) {
-            for (long tableId: tableIdSet) {
-                Long fence = tableFences.get(tableId);
-                if (fence != null && fence > startTimestamp) {
-                    return true;
-                }
-                if (fence != null && fence < lowWatermark) {
-                    tableFences.remove(tableId); // Garbage collect entries of old fences.
-                }
-            }
-        }
-
-        return false;
-    }
-
- // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
-    private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
-        for (long cellId : writeSet) {
-            long value = hashmap.getLatestWriteForCell(cellId);
-            if (value != 0 && value >= startTimestamp) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private void handleCommit(RequestEvent event) throws Exception {
-
-        long startTimestamp = event.getStartTimestamp();
-        Iterable<Long> writeSet = event.writeSet();
-        Collection<Long> tableIdSet = event.getTableIdSet();
-        boolean isCommitRetry = event.isCommitRetry();
-        Channel c = event.getChannel();
-
-        boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
-
-        // If the transaction started before the low watermark, or
-        // it started before a fence and modified the table the fence created for, or
-        // it has a write-write conflict with a transaction committed after it started
-        // Then it should abort. Otherwise, it can commit.
-        if (startTimestamp > lowWatermark &&
-            !hasConflictsWithFences(startTimestamp, tableIdSet) &&
-            !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
-
-            long commitTimestamp = timestampOracle.next();
-
-            if (nonEmptyWriteSet) {
-                long newLowWatermark = lowWatermark;
-
-                for (long r : writeSet) {
-                    long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
-                    newLowWatermark = Math.max(removed, newLowWatermark);
-                }
-
-                if (newLowWatermark != lowWatermark) {
-                    LOG.trace("Setting new low Watermark to {}", newLowWatermark);
-                    lowWatermark = newLowWatermark;
-                    persistProc.persistLowWatermark(newLowWatermark); // Async persist
-                }
-            }
-            event.getMonCtx().timerStop("request.processor.commit.latency");
-            persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
-
-        } else {
-
-            event.getMonCtx().timerStop("request.processor.commit.latency");
-            if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
-                persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
-            } else {
-                persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
-            }
-
-        }
-
-    }
-
-    private void handleFence(RequestEvent event) throws Exception {
-        long tableID = event.getTableId();
-        Channel c = event.getChannel();
-
-        long fenceTimestamp = timestampOracle.next();
-
-        tableFences.put(tableID, fenceTimestamp);
-        persistProc.addFenceToBatch(tableID, fenceTimestamp, c, event.getMonCtx());
-    }
-
-    @Override
-    public void close() throws IOException {
-
-        LOG.info("Terminating Request Processor...");
-        disruptor.halt();
-        disruptor.shutdown();
-        LOG.info("\tRequest Processor Disruptor shutdown");
-        disruptorExec.shutdownNow();
-        try {
-            disruptorExec.awaitTermination(3, SECONDS);
-            LOG.info("\tRequest Processor Disruptor executor shutdown");
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
-            Thread.currentThread().interrupt();
-        }
-        LOG.info("Request Processor terminated");
-
-    }
-
-    final static class RequestEvent implements Iterable<Long> {
-
-        enum Type {
-            TIMESTAMP, COMMIT, FENCE
-        }
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private boolean isCommitRetry = false;
-        private long startTimestamp = 0;
-        private MonitoringContext monCtx;
-        private long numCells = 0;
-
-        private static final int MAX_INLINE = 40;
-        private Long writeSet[] = new Long[MAX_INLINE];
-        private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
-
-        private Collection<Long> tableIdSet = null;
-        private long tableID = 0;
-
-        static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeCommitRequest(RequestEvent e,
-                                      long startTimestamp,
-                                      MonitoringContext monCtx,
-                                      Collection<Long> writeSet,
-                                      Collection<Long> TableIdSet,
-                                      boolean isRetry,
-                                      Channel c) {
-            e.monCtx = monCtx;
-            e.type = Type.COMMIT;
-            e.channel = c;
-            e.startTimestamp = startTimestamp;
-            e.isCommitRetry = isRetry;
-            if (writeSet.size() > MAX_INLINE) {
-                e.numCells = writeSet.size();
-                e.writeSetAsCollection = writeSet;
-            } else {
-                e.writeSetAsCollection = null;
-                e.numCells = writeSet.size();
-                int i = 0;
-                for (Long cellId : writeSet) {
-                    e.writeSet[i] = cellId;
-                    ++i;
-                }
-            }
-            e.tableIdSet = TableIdSet;
-        }
-
-        static void makeFenceRequest(RequestEvent e,
-                                     long tableID,
-                                     Channel c,
-                                     MonitoringContext monCtx) {
-            e.type = Type.FENCE;
-            e.channel = c;
-            e.monCtx = monCtx;
-            e.tableID = tableID;
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        Type getType() {
-            return type;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        Collection<Long> getTableIdSet() {
-            return tableIdSet;
-        }
-
-        long getTableId() {
-            return tableID;
-        }
-
-        @Override
-        public Iterator<Long> iterator() {
-
-            if (writeSetAsCollection != null) {
-                return writeSetAsCollection.iterator();
-            }
-
-            return new Iterator<Long>() {
-                int i = 0;
-
-                @Override
-                public boolean hasNext() {
-                    return i < numCells;
-                }
-
-                @Override
-                public Long next() {
-                    if (!hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-                    return writeSet[i++];
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-
-        }
-
-        Iterable<Long> writeSet() {
-
-            return this;
-
-        }
-
-        boolean isCommitRetry() {
-            return isCommitRetry;
-        }
-
-        final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
-            @Override
-            public RequestEvent newInstance() {
-                return new RequestEvent();
-            }
-        };
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
new file mode 100644
index 0000000..0a58b0e
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorPersistCT extends AbstractRequestProcessor {
+
+    private final PersistenceProcessor persistenceProcessor;
+
+    @Inject
+    RequestProcessorPersistCT(MetricsRegistry metrics,
+                              TimestampOracle timestampOracle,
+                              PersistenceProcessor persistenceProcessor,
+                              Panicker panicker,
+                              TSOServerConfig config,
+                              LowWatermarkWriter lowWatermarkWriter,
+                              ReplyProcessor replyProcessor) throws IOException {
+
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+        this.persistenceProcessor = persistenceProcessor;
+        requestRing = disruptor.start();
+    }
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void onTimeout() throws Exception {
+        persistenceProcessor.triggerCurrentBatchFlush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
new file mode 100644
index 0000000..41798f5
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorSkipCT extends AbstractRequestProcessor {
+
+
+    private final ReplyProcessor replyProcessor;
+
+    private final LeaseManagement leaseManager;
+    private final Panicker panicker;
+    private final String tsoHostAndPort;
+
+    @Inject
+    RequestProcessorSkipCT(MetricsRegistry metrics,
+                           TimestampOracle timestampOracle,
+                           ReplyProcessor replyProcessor,
+                           Panicker panicker,
+                           LeaseManagement leaseManager,
+                           TSOServerConfig config,
+                           LowWatermarkWriter lowWatermarkWriter,
+                           String tsoHostAndPort) throws IOException {
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+        this.replyProcessor = replyProcessor;
+        this.tsoHostAndPort = tsoHostAndPort;
+        requestRing = disruptor.start();
+        this.leaseManager = leaseManager;
+        this.panicker = panicker;
+    }
+
+    private void commitSuicideIfNotMaster() {
+        if (!leaseManager.stillInLeasePeriod()) {
+            panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
+        }
+    }
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+        commitSuicideIfNotMaster();
+        monCtx.timerStart("reply.processor.commit.latency");
+        replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.abort.latency");
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.abort.latency");
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.timestamp.latency");
+        replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void onTimeout() {
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 6d923be..610e760 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,16 +133,16 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
             if (commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
-                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
+                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
                     txAlreadyCommittedMeter.mark();
                 } else {
                     LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
-                    replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                    replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                     invalidTxMeter.mark();
                 }
             } else {
                 LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
-                replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                 noCTFoundMeter.mark();
             }
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index a218a1d..f936e88 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -165,7 +165,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
             }
 
             if (request.hasTimestampRequest()) {
-                requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics));
+                requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasCommitRequest()) {
                 TSOProto.CommitRequest cr = request.getCommitRequest();
                 requestProcessor.commitRequest(cr.getStartTimestamp(),
@@ -173,10 +173,12 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
                                                cr.getTableIdList(),
                                                cr.getIsRetry(),
                                                ctx.getChannel(),
-                                               new MonitoringContext(metrics));
+                                               MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasFenceRequest()) {
                 TSOProto.FenceRequest fr = request.getFenceRequest();
-                requestProcessor.fenceRequest(fr.getTableId(), ctx.getChannel(), new MonitoringContext(metrics));
+                requestProcessor.fenceRequest(fr.getTableId(),
+                        ctx.getChannel(),
+                        MonitoringContextFactory.getInstance(config,metrics));
             } else {
                 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
                 ctx.getChannel().close();
@@ -193,7 +195,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
             LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
             return;
         }
-        LOG.warn("Unexpected exception from downstream. Closing channel {}", ctx.getChannel(), e.getCause());
+        LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
         ctx.getChannel().close();
     }
 
@@ -244,6 +246,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
         } else {
             response.setClientCompatible(false);
         }
+        response.setLowLatency(config.getLowLatency());
         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 4d0d844..5c96aa3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -52,7 +52,7 @@ class TSOModule extends AbstractModule {
         } else {
             bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
         }
-
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
         bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
index f30e64d..d97b824 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
@@ -51,8 +51,9 @@ public class TSOServer extends AbstractIdleService {
     @Inject
     private RetryProcessor retryProcessor;
     @Inject
-    private ReplyProcessor replyProcessor;
-
+    public ReplyProcessor replyProcessor;
+    @Inject
+    private LowWatermarkWriter lowWatermarkWriter;
     // ----------------------------------------------------------------------------------------------------------------
     // High availability related variables
     // ----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 8f061a1..e28add3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -89,6 +89,26 @@ public class TSOServerConfig extends SecureHBaseConfig {
 
     private String timestampType;
 
+    private Boolean lowLatency;
+
+    public boolean monitorContext;
+
+    public boolean getMonitorContext() {
+        return monitorContext;
+    }
+
+    public void setMonitorContext(boolean monitorContext) {
+        this.monitorContext = monitorContext;
+    }
+
+    public Boolean getLowLatency() {
+        return lowLatency;
+    }
+
+    public void setLowLatency(Boolean lowLatency) {
+        this.lowLatency = lowLatency;
+    }
+
     public int getPort() {
         return port;
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index 454526f..fec82af 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -83,8 +83,8 @@ public class TimestampOracleImpl implements TimestampOracle {
 
     }
 
-    static final long TIMESTAMP_BATCH = 10_000_000; // 10 million
-    private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000; // 1 million
+    static final long TIMESTAMP_BATCH = 10_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 10 million
+    private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 1 million
 
     private long lastTimestamp;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index 4e45122..5aa555f 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -30,6 +30,7 @@ batchPersistTimeoutInMs: 10
 # INCREMENTAL - [Default] regular counter
 # WORLD_TIME - world time based counter
 timestampType: INCREMENTAL
+lowLatency: false
 # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
 timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
 commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
@@ -38,6 +39,8 @@ leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
 # Default stats/metrics configuration
 metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
 
+monitorContext: false
+
 # ---------------------------------------------------------------------------------------------------------------------
 # Timestamp storage configuration options
 # ---------------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index fc30e60..a346e5e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -58,6 +58,7 @@ public class TSOMockModule extends AbstractModule {
             bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
         }
         bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
         install(config.getLeaseModule());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 573cd89..c286f85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -43,7 +43,7 @@ public class TestBatch {
     @Mock
     private Channel channel;
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     @BeforeMethod
     void setup() {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index ae89f01..779111d 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -137,9 +137,12 @@ public class TestPanicker {
                                                                  handlers,
                                                                  metrics);
 
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
+
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker,
+                mock(TSOServerConfig.class), lowWatermarkWriter, mock(ReplyProcessor.class));
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
@@ -189,9 +192,12 @@ public class TestPanicker {
                                                                  panicker,
                                                                  handlers,
                                                                  metrics);
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class),
+                lowWatermarkWriter, mock(ReplyProcessor.class));
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 4779608..5d9e2c2 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -68,6 +68,7 @@ public class TestPersistenceProcessor {
 
     private MetricsRegistry metrics;
     private CommitTable commitTable;
+    private LowWatermarkWriter lowWatermarkWriter;
 
     @BeforeMethod(alwaysRun = true, timeOut = 30_000)
     public void initMocksAndComponents() throws Exception {
@@ -101,6 +102,7 @@ public class TestPersistenceProcessor {
     public void testLowWatermarkIsPersisted() throws Exception {
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
+        lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -123,7 +125,7 @@ public class TestPersistenceProcessor {
                                              handlers,
                                              metrics);
 
-        persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+        lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
 
         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
         CommitTable.Writer lwmWriter = commitTable.getWriter();
@@ -166,10 +168,10 @@ public class TestPersistenceProcessor {
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
 
         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
 
@@ -211,8 +213,8 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
         // Fill 1st handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
         verify(batchPool, times(2)).borrowObject();
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
@@ -223,14 +225,14 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(3)).borrowObject();
 
         // Fill 2nd handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
 
         // Start filling a new currentBatch and flush it immediately
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
         verify(batchPool, times(5)).borrowObject();
         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
         verify(batchPool, times(6)).borrowObject();
@@ -281,7 +283,7 @@ public class TestPersistenceProcessor {
 
         // The non-ha lease manager always return true for
         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -336,7 +338,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true always
         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -357,7 +359,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -378,7 +380,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -402,7 +404,7 @@ public class TestPersistenceProcessor {
         // Configure mock writer to flush unsuccessfully
         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -452,7 +454,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Configure lease manager to work normally
         doReturn(true).when(leaseManager).stillInLeasePeriod();
@@ -492,7 +494,7 @@ public class TestPersistenceProcessor {
 
         // Configure writer to explode with a runtime exception
         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Check the panic is extended!
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index d60d019..0b23b99 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -137,7 +137,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertTrue(batch.isEmpty());
 
@@ -148,14 +148,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -167,14 +167,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -187,14 +187,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -206,7 +206,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -215,7 +215,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -226,8 +226,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -239,7 +239,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -255,8 +255,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -268,7 +268,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
@@ -281,8 +281,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -294,8 +294,8 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -306,8 +306,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -319,7 +319,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 2);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -334,12 +334,12 @@ public class TestPersistenceProcessorHandler {
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
 
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -351,7 +351,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 4);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -378,7 +378,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -420,7 +420,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -455,7 +455,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ead24b..54d1e70 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -70,7 +70,7 @@ public class TestReplyProcessor {
     private Panicker panicker;
 
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     private MetricsRegistry metrics;
 
@@ -247,11 +247,11 @@ public class TestReplyProcessor {
         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
 
         InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
-        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 1c44d05..645caa1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -59,26 +59,32 @@ public class TestRequestProcessor {
     // Request processor under test
     private RequestProcessor requestProc;
 
+    private LowWatermarkWriter lowWatermarkWriter;
+    private TimestampOracleImpl timestampOracle;
+    private ReplyProcessor replyProcessor;
+
     @BeforeMethod
     public void beforeMethod() throws Exception {
 
         // Build the required scaffolding for the test
         MetricsRegistry metrics = new NullMetricsProvider();
 
-        TSOServerConfig config = new TSOServerConfig();
-        config.setConflictMapSize(CONFLICT_MAP_SIZE);
-
         TimestampOracleImpl timestampOracle =
                 new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
 
         stateManager = new TSOStateManagerImpl(timestampOracle);
-
+        lowWatermarkWriter = mock(LowWatermarkWriter.class);
         persist = mock(PersistenceProcessor.class);
+        replyProcessor = mock(ReplyProcessor.class);
         SettableFuture<Void> f = SettableFuture.create();
         f.set(null);
-        doReturn(f).when(persist).persistLowWatermark(any(Long.class));
+        doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
 
-        requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
+        TSOServerConfig config = new TSOServerConfig();
+        config.setConflictMapSize(CONFLICT_MAP_SIZE);
+
+        requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
+                config, lowWatermarkWriter,replyProcessor);
 
         // Initialize the state for the experiment
         stateManager.register(requestProc);
@@ -89,15 +95,15 @@ public class TestRequestProcessor {
     @Test(timeOut = 30_000)
     public void testTimestamp() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
 
         long firstTS = firstTScapture.getValue();
         // verify that timestamps increase monotonically
         for (int i = 0; i < 100; i++) {
-            requestProc.timestampRequest(null, new MonitoringContext(metrics));
+            requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
             verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
             firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
         }
@@ -107,48 +113,48 @@ public class TestRequestProcessor {
     @Test(timeOut = 30_000)
     public void testCommit() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long firstTS = TScapture.getValue();
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
-        requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+        requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
 
-        requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+        requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
 
         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
 
         // test conflict
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(2)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long secondTS = TScapture.getValue();
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(3)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long thirdTS = TScapture.getValue();
 
-        requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
-        requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+        requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
     @Test(timeOut = 30_000)
     public void testFence() throws Exception {
 
-        requestProc.fenceRequest(666L, null, new MonitoringContext(metrics));
+        requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(1)).addFenceToBatch(eq(666L),
+        verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
                 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
 
     }
@@ -159,11 +165,11 @@ public class TestRequestProcessor {
         List<Long> writeSet = Collections.emptyList();
 
         // Start a transaction...
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
                                                                    any(Channel.class),
-                                                                   any(MonitoringContext.class));
+                                                                   any(MonitoringContextImpl.class));
         long startTS = capturedTS.getValue();
 
         // ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -171,8 +177,8 @@ public class TestRequestProcessor {
         stateManager.initialize();
 
         // ...check that the transaction is aborted when trying to commit
-        requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
@@ -187,17 +193,17 @@ public class TestRequestProcessor {
         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
-            requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+            requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         }
 
         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
         // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
         // Finally it should never be called with the next element
-        verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index ab17ecc..5476f90 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -56,6 +56,8 @@ public class TestRetryProcessor {
     private Panicker panicker;
     @Mock
     private MetricsRegistry metrics;
+    @Mock
+    private MonitoringContextImpl monCtx;
 
     private CommitTable commitTable;
 
@@ -74,10 +76,10 @@ public class TestRetryProcessor {
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
-        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = firstTSCapture.getValue();
         assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
     }
@@ -91,13 +93,13 @@ public class TestRetryProcessor {
 
         // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
         commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
 
         verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
                                                                     secondTSCapture.capture(),
-                                                                    any(Channel.class));
+                                                                    any(Channel.class), any(MonitoringContextImpl.class));
 
         long startTS = firstTSCapture.getValue();
         long commitTS = secondTSCapture.getValue();
@@ -124,9 +126,9 @@ public class TestRetryProcessor {
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = startTSCapture.getValue();
         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 157bb48..245f3b6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -260,9 +260,9 @@ public class TestTSOChannelHandlerNetty {
         tsBuilder.setTimestampRequest(tsRequestBuilder.build());
         // Write into the channel
         channel.write(tsBuilder.build()).await();
-        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).never())
-                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     private void testWritingCommitRequest(Channel channel) throws InterruptedException {
@@ -277,9 +277,9 @@ public class TestTSOChannelHandlerNetty {
         assertTrue(r.hasCommitRequest());
         // Write into the channel
         channel.write(commitBuilder.build()).await();
-        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
-                .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     private void testWritingFenceRequest(Channel channel) throws InterruptedException {
@@ -293,9 +293,9 @@ public class TestTSOChannelHandlerNetty {
         assertTrue(r.hasFenceRequest());
         // Write into the channel
         channel.write(fenceBuilder.build()).await();
-        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
-                .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContext.class));
+                .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     // ----------------------------------------------------------------------------------------------------------------


[3/4] incubator-omid git commit: OMID-90 Add omid low latency mode

Posted by yo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
new file mode 100644
index 0000000..cf0fd58
--- /dev/null
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilterLL.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestSnapshotFilterLL {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilterLL.class);
+
+    private static final String TEST_FAMILY = "test-fam";
+
+    private static final int MAX_VERSIONS = 3;
+
+    private AbstractTransactionManager tm;
+
+    private Injector injector;
+
+    private Admin admin;
+    private Configuration hbaseConf;
+    private HBaseTestingUtility hbaseTestUtil;
+    private MiniHBaseCluster hbaseCluster;
+
+    private TSOServer tso;
+
+    private CommitTable commitTable;
+    private PostCommitActions syncPostCommitter;
+    private Connection connection;
+
+    @BeforeClass
+    public void setupTestSnapshotFilter() throws Exception {
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setPort(5678);
+        tsoConfig.setConflictMapSize(1);
+        tsoConfig.setWaitStrategy("LOW_CPU");
+        tsoConfig.setLowLatency(true);
+        injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
+        hbaseConf = injector.getInstance(Configuration.class);
+        hbaseConf.setBoolean("omid.server.side.filter", true);
+        hbaseConf.setInt("hbase.hconnection.threads.core", 5);
+        hbaseConf.setInt("hbase.hconnection.threads.max", 10);
+        // Tunn down handler threads in regionserver
+        hbaseConf.setInt("hbase.regionserver.handler.count", 10);
+
+        // Set to random port
+        hbaseConf.setInt("hbase.master.port", 0);
+        hbaseConf.setInt("hbase.master.info.port", 0);
+        hbaseConf.setInt("hbase.regionserver.port", 0);
+        hbaseConf.setInt("hbase.regionserver.info.port", 0);
+
+
+        HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
+        HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
+
+        setupHBase();
+        connection = ConnectionFactory.createConnection(hbaseConf);
+        admin = connection.getAdmin();
+        createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
+        setupTSO();
+
+        commitTable = injector.getInstance(CommitTable.class);
+    }
+
+    private void setupHBase() throws Exception {
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        LOG.info("Setting up HBase");
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        LOG.info("Creating HBase MiniCluster");
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        hbaseCluster = hbaseTestUtil.startMiniCluster(1);
+    }
+
+    private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
+                                           HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
+        createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
+
+        createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
+    }
+
+    private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
+        if (!admin.tableExists(TableName.valueOf(tableName))) {
+            LOG.info("Creating {} table...", tableName);
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+            for (byte[] family : families) {
+                HColumnDescriptor datafam = new HColumnDescriptor(family);
+                datafam.setMaxVersions(MAX_VERSIONS);
+                desc.addFamily(datafam);
+            }
+
+            int priority = Coprocessor.PRIORITY_HIGHEST;
+
+            desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
+            desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
+
+            admin.createTable(desc);
+            try {
+                hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    private void setupTSO() throws IOException, InterruptedException {
+        tso = injector.getInstance(TSOServer.class);
+        tso.startAndWait();
+        TestUtils.waitForSocketListening("localhost", 5678, 100);
+        Thread.currentThread().setName("UnitTest(s) thread");
+    }
+
+    @AfterClass
+    public void cleanupTestSnapshotFilter() throws Exception {
+        teardownTSO();
+        hbaseCluster.shutdown();
+    }
+
+    private void teardownTSO() throws IOException, InterruptedException {
+        tso.stopAndWait();
+        TestUtils.waitForSocketNotListening("localhost", 5678, 1000);
+    }
+
+    @BeforeMethod
+    public void setupTestSnapshotFilterIndividualTest() throws Exception {
+        tm = spy((AbstractTransactionManager) newTransactionManager());
+    }
+
+    private TransactionManager newTransactionManager() throws Exception {
+        HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+        hbaseOmidClientConf.setConnectionString("localhost:5678");
+        hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+        CommitTable.Client commitTableClient = commitTable.getClient();
+        syncPostCommitter =
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient));
+        return HBaseTransactionManager.builder(hbaseOmidClientConf)
+                .postCommitter(syncPostCommitter)
+                .commitTableClient(commitTableClient)
+                .build();
+    }
+
+
+    @Test(timeOut = 60_000)
+    public void testInvalidate() throws Throwable {
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+
+        String TEST_TABLE = "testGetFirstResult";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable tt = new TTable(connection, TEST_TABLE);
+
+        Transaction tx1 = tm.begin();
+
+        Put row1 = new Put(rowName1);
+        row1.addColumn(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+
+        Transaction tx2 = tm.begin();
+
+        Get get = new Get(rowName1);
+        Result result = tt.get(tx2, get);
+
+        assertTrue(result.isEmpty(), "Result should not be empty!");
+
+
+        boolean gotInvalidated = false;
+        try {
+            tm.commit(tx1);
+        } catch (RollbackException e) {
+            gotInvalidated = true;
+        }
+        assertTrue(gotInvalidated);
+        assertTrue(tm.isLowLatency());
+    }
+
+    @Test(timeOut = 60_000)
+    public void testInvalidateByScan() throws Throwable {
+        byte[] rowName1 = Bytes.toBytes("row1");
+        byte[] famName1 = Bytes.toBytes(TEST_FAMILY);
+        byte[] colName1 = Bytes.toBytes("col1");
+        byte[] dataValue1 = Bytes.toBytes("testWrite-1");
+
+        String TEST_TABLE = "testGetFirstResult";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable tt = new TTable(connection, TEST_TABLE);
+
+        Transaction tx1 = tm.begin();
+
+        Put row1 = new Put(rowName1);
+        row1.addColumn(famName1, colName1, dataValue1);
+        tt.put(tx1, row1);
+
+
+        Transaction tx2 = tm.begin();
+
+        ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1));
+        assertTrue(iterableRS.next() == null);
+
+        tm.commit(tx2);
+
+        boolean gotInvalidated = false;
+        try {
+            tm.commit(tx1);
+        } catch (RollbackException e) {
+            gotInvalidated = true;
+        }
+        assertTrue(gotInvalidated);
+        assertTrue(tm.isLowLatency());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
index 6223bf2..5b587a0 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java
@@ -81,6 +81,7 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
     private final Set<T> conflictFreeWriteSet;
     private Status status = Status.RUNNING;
     private VisibilityLevel visibilityLevel;
+    private final boolean isLowLatency;
 
     /**
      * Base constructor
@@ -105,8 +106,10 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
                                long epoch,
                                Set<T> writeSet,
                                Set<T> conflictFreeWriteSet,
-                               AbstractTransactionManager transactionManager) {
-        this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet, transactionManager);
+                               AbstractTransactionManager transactionManager,
+                               boolean isLowLatency) {
+        this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, conflictFreeWriteSet,
+                transactionManager, isLowLatency);
     }
 
     public AbstractTransaction(long transactionId,
@@ -115,7 +118,9 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
             long epoch,
             Set<T> writeSet,
             Set<T> conflictFreeWriteSet,
-            AbstractTransactionManager transactionManager) {
+            AbstractTransactionManager transactionManager,
+            boolean isLowLatency) {
+
         this.startTimestamp = this.writeTimestamp = transactionId;
         this.readTimestamp = readTimestamp;
         this.epoch = epoch;
@@ -123,6 +128,7 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
         this.conflictFreeWriteSet = conflictFreeWriteSet;
         this.transactionManager = transactionManager;
         this.visibilityLevel = visibilityLevel;
+        this.isLowLatency = isLowLatency;
     }
 
     /**
@@ -152,7 +158,8 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
                                Set<T> conflictFreeWriteSet,
                                AbstractTransactionManager transactionManager,
                                long readTimestamp,
-                               long writeTimestamp) {
+                               long writeTimestamp,
+                               boolean isLowLatency) {
         this.startTimestamp = transactionId;
         this.readTimestamp = readTimestamp;
         this.writeTimestamp = writeTimestamp;
@@ -161,6 +168,7 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
         this.conflictFreeWriteSet = conflictFreeWriteSet;
         this.transactionManager = transactionManager;
         this.visibilityLevel = VisibilityLevel.SNAPSHOT;
+        this.isLowLatency = isLowLatency;
     }
 
     /**
@@ -383,4 +391,8 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti
         metadata.put(key, value);
     }
 
+    @Override
+    public boolean isLowLatency() {
+        return isLowLatency;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index e1d623d..6bbbb75 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -66,6 +66,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
     private final PostCommitActions postCommitter;
     protected final TSOClient tsoClient;
     protected final CommitTable.Client commitTableClient;
+    private final CommitTable.Writer commitTableWriter;
     private final TransactionFactory<? extends CellId> transactionFactory;
 
     // Metrics
@@ -96,11 +97,13 @@ public abstract class AbstractTransactionManager implements TransactionManager {
                                       PostCommitActions postCommitter,
                                       TSOClient tsoClient,
                                       CommitTable.Client commitTableClient,
+                                      CommitTable.Writer commitTableWriter,
                                       TransactionFactory<? extends CellId> transactionFactory) {
 
         this.tsoClient = tsoClient;
         this.postCommitter = postCommitter;
         this.commitTableClient = commitTableClient;
+        this.commitTableWriter = commitTableWriter;
         this.transactionFactory = transactionFactory;
 
         // Metrics configuration
@@ -178,7 +181,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
     }
 
     /**
-     * @see org.apache.omid.transaction.TransactionManager#fence()
+     * @see org.apache.omid.transaction.TransactionManager#fence(byte[])
      */
     @Override
     public final Transaction fence(byte[] tableName) throws TransactionException {
@@ -243,7 +246,10 @@ public abstract class AbstractTransactionManager implements TransactionManager {
                 if (tx.getWriteSet().isEmpty() && tx.getConflictFreeWriteSet().isEmpty()) {
                     markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
                 } else {
-                    commitRegularTransaction(tx);
+                    if (tsoClient.isLowLatency())
+                        commitLowLatencyTransaction(tx);
+                    else
+                        commitRegularTransaction(tx);
                 }
                 committedTxsCounter.inc();
             } finally {
@@ -350,6 +356,43 @@ public abstract class AbstractTransactionManager implements TransactionManager {
 
     }
 
+    private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
+            throws RollbackException, TransactionException {
+        try {
+
+            long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
+            boolean committed = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
+            if (!committed) {
+                // Transaction has been invalidated by other client
+                rollback(tx);
+                commitTableClient.completeTransaction(tx.getStartTimestamp());
+                rolledbackTxsCounter.inc();
+                throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
+            }
+            certifyCommitForTx(tx, commitTs);
+            updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
+
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
+                rollback(tx);
+                rolledbackTxsCounter.inc();
+                throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
+            }
+
+            if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
+                errorTxsCounter.inc();
+                rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
+                throw new RollbackException(tx + " rolled-back precautionary", e.getCause());
+            } else {
+                throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
     private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
             throws RollbackException, TransactionException
     {
@@ -446,4 +489,7 @@ public abstract class AbstractTransactionManager implements TransactionManager {
 
     }
 
+    public boolean isLowLatency() {
+        return tsoClient.isLowLatency();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java
index fdc5b3f..9ae5cdb 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/Transaction.java
@@ -88,5 +88,11 @@ public interface Transaction {
     void setMetadata(String key, Object value);
 
     Optional<Object> getMetadata(String key);
+
+    /**
+     * Returns whether the transaction was created by a lowLatency TransactionalManager
+     * @return whether the transaction was created by a lowLatency TransactionalManager
+     */
+    boolean isLowLatency();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
index e18befc..42e0fc3 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
@@ -154,4 +154,9 @@ class MockTSOClient implements TSOProtocol {
         f.set(null);
         return new ForwardingTSOFuture<>(f);
     }
+
+    @Override
+    public boolean isLowLatency() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index b4c794c..26d79a2 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -96,6 +96,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     private InetSocketAddress tsoAddr;
     private String zkCurrentTsoPath;
 
+    private boolean lowLatency;
 
     // Use to extract unique table identifiers from the modified cells list.
     private final Set<Long> tableIDs;
@@ -170,6 +171,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", 100);
+        lowLatency = false;
 
         this.tableIDs = new HashSet<Long>();
 
@@ -249,7 +251,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     }
 
     /**
-     * @see TSOProtocol#getFence()
+     * @see TSOProtocol#getFence(long)
      */
     @Override
     public TSOFuture<Long> getFence(long tableId) {
@@ -346,6 +348,11 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
 
     }
 
+    @Override
+    public boolean isLowLatency() {
+        return lowLatency;
+    }
+
     // ****************************************** Finite State Machine ************************************************
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -625,6 +632,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         }
 
         public StateMachine.State handleEvent(ResponseEvent e) {
+            lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
             if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
                 if (timeout != null) {
                     timeout.cancel();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
index 02eab90..e609260 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
@@ -81,4 +81,10 @@ public interface TSOProtocol {
      */
     TSOFuture<Void> close();
 
+    /**
+     * checks if tso is low latency protocol
+     * @return
+     */
+    boolean isLowLatency();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
new file mode 100644
index 0000000..2dac28f
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
+import com.lmax.disruptor.TimeoutHandler;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.tso.TSOStateManager.TSOState;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static com.lmax.disruptor.dsl.ProducerType.MULTI;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY;
+
+abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestProcessor.RequestEvent>, RequestProcessor, TimeoutHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
+
+    // Disruptor-related attributes
+    private final ExecutorService disruptorExec;
+    protected final Disruptor<RequestEvent> disruptor;
+    protected RingBuffer<RequestEvent> requestRing;
+
+    private final TimestampOracle timestampOracle;
+    private final CommitHashMap hashmap;
+    private final Map<Long, Long> tableFences;
+    private final MetricsRegistry metrics;
+    private final LowWatermarkWriter lowWatermarkWriter;
+    private long lowWatermark = -1L;
+
+    //Used to forward fence
+    private final ReplyProcessor replyProcessor;
+
+    AbstractRequestProcessor(MetricsRegistry metrics,
+                             TimestampOracle timestampOracle,
+                             Panicker panicker,
+                             TSOServerConfig config,
+                             LowWatermarkWriter lowWatermarkWriter, ReplyProcessor replyProcessor)
+            throws IOException {
+
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Disruptor initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
+
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
+        this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
+
+        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
+        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
+        disruptor.handleEventsWith(this);
+
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Attribute initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        this.metrics = metrics;
+        this.timestampOracle = timestampOracle;
+        this.hashmap = new CommitHashMap(config.getConflictMapSize());
+        this.tableFences = new HashMap<Long, Long>();
+        this.lowWatermarkWriter = lowWatermarkWriter;
+
+        this.replyProcessor = replyProcessor;
+
+        LOG.info("RequestProcessor initialized");
+
+    }
+
+    /**
+     * This should be called when the TSO gets leadership
+     */
+    @Override
+    public void update(TSOState state) throws Exception {
+        LOG.info("Initializing RequestProcessor state...");
+        this.lowWatermark = state.getLowWatermark();
+        lowWatermarkWriter.persistLowWatermark(lowWatermark).get(); // Sync persist
+        LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
+    }
+
+    @Override
+    public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
+
+        switch (event.getType()) {
+            case TIMESTAMP:
+                handleTimestamp(event);
+                break;
+            case COMMIT:
+                handleCommit(event);
+                break;
+            case FENCE:
+                handleFence(event);
+                break;
+            default:
+                throw new IllegalStateException("Event not allowed in Request Processor: " + event);
+        }
+
+    }
+
+    @Override
+    public void onTimeout(long sequence) throws Exception {
+
+        // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
+        // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
+        // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
+        // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
+        // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
+        // TODO (cont) in persistProc and it is guaranteed that access them serially.
+        onTimeout();
+    }
+
+    @Override
+    public void timestampRequest(Channel c, MonitoringContext monCtx) {
+
+        monCtx.timerStart("request.processor.timestamp.latency");
+        long seq = requestRing.next();
+        RequestEvent e = requestRing.get(seq);
+        RequestEvent.makeTimestampRequest(e, c, monCtx);
+        requestRing.publish(seq);
+
+    }
+
+    @Override
+    public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
+                              MonitoringContext monCtx) {
+
+        monCtx.timerStart("request.processor.commit.latency");
+        long seq = requestRing.next();
+        RequestEvent e = requestRing.get(seq);
+        RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
+        requestRing.publish(seq);
+
+    }
+
+    @Override
+    public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
+
+        monCtx.timerStart("request.processor.fence.latency");
+        long seq = requestRing.next();
+        RequestEvent e = requestRing.get(seq);
+        RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
+        requestRing.publish(seq);
+
+    }
+
+    private void handleTimestamp(RequestEvent requestEvent) throws Exception {
+
+        long timestamp = timestampOracle.next();
+        requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
+        forwardTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
+    }
+
+    // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
+    private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
+        if (!tableFences.isEmpty()) {
+            for (long tableId: tableIdSet) {
+                Long fence = tableFences.get(tableId);
+                if (fence != null && fence > startTimestamp) {
+                    return true;
+                }
+                if (fence != null && fence < lowWatermark) {
+                    tableFences.remove(tableId); // Garbage collect entries of old fences.
+                }
+            }
+        }
+
+        return false;
+    }
+
+ // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
+    private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
+        for (long cellId : writeSet) {
+            long value = hashmap.getLatestWriteForCell(cellId);
+            if (value != 0 && value >= startTimestamp) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void handleCommit(RequestEvent event) throws Exception {
+
+        long startTimestamp = event.getStartTimestamp();
+        Iterable<Long> writeSet = event.writeSet();
+        Collection<Long> tableIdSet = event.getTableIdSet();
+        boolean isCommitRetry = event.isCommitRetry();
+        Channel c = event.getChannel();
+
+        boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
+
+        // If the transaction started before the low watermark, or
+        // it started before a fence and modified the table the fence created for, or
+        // it has a write-write conflict with a transaction committed after it started
+        // Then it should abort. Otherwise, it can commit.
+        if (startTimestamp > lowWatermark &&
+            !hasConflictsWithFences(startTimestamp, tableIdSet) &&
+            !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
+
+            long commitTimestamp = timestampOracle.next();
+
+            if (nonEmptyWriteSet) {
+                long newLowWatermark = lowWatermark;
+
+                for (long r : writeSet) {
+                    long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
+                    newLowWatermark = Math.max(removed, newLowWatermark);
+                }
+
+                if (newLowWatermark != lowWatermark) {
+                    LOG.trace("Setting new low Watermark to {}", newLowWatermark);
+                    lowWatermark = newLowWatermark;
+                    lowWatermarkWriter.persistLowWatermark(newLowWatermark); // Async persist
+                }
+            }
+            event.getMonCtx().timerStop("request.processor.commit.latency");
+            forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
+
+        } else {
+
+            event.getMonCtx().timerStop("request.processor.commit.latency");
+            if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
+                forwardCommitRetry(startTimestamp, c, event.getMonCtx());
+            } else {
+                forwardAbort(startTimestamp, c, event.getMonCtx());
+            }
+
+        }
+
+    }
+
+    private void handleFence(RequestEvent event) throws Exception {
+        long tableID = event.getTableId();
+        Channel c = event.getChannel();
+
+        long fenceTimestamp = timestampOracle.next();
+
+        tableFences.put(tableID, fenceTimestamp);
+
+        event.monCtx.timerStart("reply.processor.fence.latency");
+        replyProcessor.sendFenceResponse(tableID, fenceTimestamp, c, event.monCtx);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+        LOG.info("Terminating Request Processor...");
+        disruptor.halt();
+        disruptor.shutdown();
+        LOG.info("\tRequest Processor Disruptor shutdown");
+        disruptorExec.shutdownNow();
+        try {
+            disruptorExec.awaitTermination(3, SECONDS);
+            LOG.info("\tRequest Processor Disruptor executor shutdown");
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
+            Thread.currentThread().interrupt();
+        }
+        LOG.info("Request Processor terminated");
+
+    }
+
+    protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    protected abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    protected abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    protected abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    protected abstract void onTimeout() throws Exception;
+
+
+
+    final static class RequestEvent implements Iterable<Long> {
+
+        enum Type {
+            TIMESTAMP, COMMIT, FENCE
+        }
+
+        private Type type = null;
+        private Channel channel = null;
+
+        private boolean isCommitRetry = false;
+        private long startTimestamp = 0;
+        private MonitoringContext monCtx;
+        private long numCells = 0;
+
+        private static final int MAX_INLINE = 40;
+        private Long writeSet[] = new Long[MAX_INLINE];
+        private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
+
+        private Collection<Long> tableIdSet = null;
+        private long tableID = 0;
+
+        static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
+            e.type = Type.TIMESTAMP;
+            e.channel = c;
+            e.monCtx = monCtx;
+        }
+
+        static void makeCommitRequest(RequestEvent e,
+                                      long startTimestamp,
+                                      MonitoringContext monCtx,
+                                      Collection<Long> writeSet,
+                                      Collection<Long> TableIdSet,
+                                      boolean isRetry,
+                                      Channel c) {
+            e.monCtx = monCtx;
+            e.type = Type.COMMIT;
+            e.channel = c;
+            e.startTimestamp = startTimestamp;
+            e.isCommitRetry = isRetry;
+            if (writeSet.size() > MAX_INLINE) {
+                e.numCells = writeSet.size();
+                e.writeSetAsCollection = writeSet;
+            } else {
+                e.writeSetAsCollection = null;
+                e.numCells = writeSet.size();
+                int i = 0;
+                for (Long cellId : writeSet) {
+                    e.writeSet[i] = cellId;
+                    ++i;
+                }
+            }
+            e.tableIdSet = TableIdSet;
+        }
+
+        static void makeFenceRequest(RequestEvent e,
+                                     long tableID,
+                                     Channel c,
+                                     MonitoringContext monCtx) {
+            e.type = Type.FENCE;
+            e.channel = c;
+            e.monCtx = monCtx;
+            e.tableID = tableID;
+        }
+
+        MonitoringContext getMonCtx() {
+            return monCtx;
+        }
+
+        Type getType() {
+            return type;
+        }
+
+        long getStartTimestamp() {
+            return startTimestamp;
+        }
+
+        Channel getChannel() {
+            return channel;
+        }
+
+        Collection<Long> getTableIdSet() {
+            return tableIdSet;
+        }
+
+        long getTableId() {
+            return tableID;
+        }
+
+        @Override
+        public Iterator<Long> iterator() {
+
+            if (writeSetAsCollection != null) {
+                return writeSetAsCollection.iterator();
+            }
+
+            return new Iterator<Long>() {
+                int i = 0;
+
+                @Override
+                public boolean hasNext() {
+                    return i < numCells;
+                }
+
+                @Override
+                public Long next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+                    return writeSet[i++];
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+
+        }
+
+        Iterable<Long> writeSet() {
+
+            return this;
+
+        }
+
+        boolean isCommitRetry() {
+            return isCommitRetry;
+        }
+
+        final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
+            @Override
+            public RequestEvent newInstance() {
+                return new RequestEvent();
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
index 2584629..032f3a3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
@@ -51,8 +51,15 @@ public class DisruptorModule extends AbstractModule {
              bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
              break;
         }
-        bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
-        bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
+
+        if (config.getLowLatency()) {
+            bind(RequestProcessor.class).to(RequestProcessorSkipCT.class).in(Singleton.class);
+            bind(PersistenceProcessor.class).to(PersitenceProcessorNullImpl.class).in(Singleton.class);
+        } else {
+            bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
+            bind(RequestProcessor.class).to(RequestProcessorPersistCT.class).in(Singleton.class);
+        }
+
         bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
         bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java
new file mode 100644
index 0000000..ddd0623
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import java.util.concurrent.Future;
+
+public interface LowWatermarkWriter {
+    Future<Void> persistLowWatermark(final long lowWatermark);
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java
new file mode 100644
index 0000000..8de1b20
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.Timer;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.omid.metrics.MetricsUtils.name;
+
+public class LowWatermarkWriterImpl implements LowWatermarkWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LowWatermarkWriterImpl.class);
+
+    private final Timer lwmWriteTimer;
+    private final CommitTable.Writer lowWatermarkWriter;
+    private final ExecutorService lowWatermarkWriterExecutor;
+    private MetricsRegistry metrics;
+
+    @Inject
+    LowWatermarkWriterImpl(TSOServerConfig config,
+                           CommitTable commitTable,
+                           MetricsRegistry metrics)
+            throws Exception {
+        this.metrics = metrics;
+        this.lowWatermarkWriter = commitTable.getWriter();
+        // Low Watermark writer
+        ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
+        this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
+
+        // Metrics config
+        this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
+        LOG.info("PersistentProcessor initialized");
+    }
+
+    @Override
+    public Future<Void> persistLowWatermark(final long lowWatermark) {
+
+        return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException {
+                try {
+                    lwmWriteTimer.start();
+                    lowWatermarkWriter.updateLowWatermark(lowWatermark);
+                    lowWatermarkWriter.flush();
+                } finally {
+                    lwmWriteTimer.stop();
+                }
+                return null;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
index 645806a..ea183a8 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
@@ -15,61 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.omid.tso;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import org.apache.omid.metrics.MetricsRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.omid.metrics.MetricsUtils.name;
 
-@NotThreadSafe
-public class MonitoringContext {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MonitoringContext.class);
-
-    private volatile boolean flag;
-    private Map<String, Long> elapsedTimeMsMap = new ConcurrentHashMap<>();
-    private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
-    private MetricsRegistry metrics;
+package org.apache.omid.tso;
 
-    public MonitoringContext(MetricsRegistry metrics) {
-        this.metrics = metrics;
-    }
+public interface MonitoringContext {
 
-    public void timerStart(String name) {
-        Stopwatch stopwatch = new Stopwatch();
-        stopwatch.start();
-        timers.put(name, stopwatch);
-    }
+    public void timerStart(String name);
 
-    public void timerStop(String name) {
-        if (flag) {
-            LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
-            return;
-        }
-        Stopwatch activeStopwatch = timers.get(name);
-        if (activeStopwatch == null) {
-            throw new IllegalStateException(
-                    String.format("There is no %s timer in the %s monitoring context.", name, this));
-        }
-        activeStopwatch.stop();
-        elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
-        timers.remove(name);
-    }
+    public void timerStop(String name);
 
-    public void publish() {
-        flag = true;
-        for (Map.Entry<String, Long> entry : elapsedTimeMsMap.entrySet()) {
-            metrics.timer(name("tso", entry.getKey())).update(entry.getValue());
-        }
-    }
+    public void publish();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java
new file mode 100644
index 0000000..4280abc
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.metrics.MetricsRegistry;
+
+public class MonitoringContextFactory {
+    private MonitoringContextFactory(){};
+
+    static public MonitoringContext getInstance(TSOServerConfig config, MetricsRegistry metrics) {
+        if (config.getMonitorContext())
+            return new MonitoringContextImpl(metrics);
+        else
+            return new MonitoringContextNullImpl();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java
new file mode 100644
index 0000000..5792a77
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.omid.metrics.MetricsUtils.name;
+import java.util.concurrent.TimeUnit;
+
+@NotThreadSafe
+public class MonitoringContextImpl implements MonitoringContext{
+
+    private static final Logger LOG = LoggerFactory.getLogger(MonitoringContextImpl.class);
+
+    private volatile boolean flag;
+    private Map<String, Long> elapsedTimeMsMap = new ConcurrentHashMap<>();
+    private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
+    private MetricsRegistry metrics;
+
+    public MonitoringContextImpl(MetricsRegistry metrics) {
+        this.metrics = metrics;
+    }
+
+    public void timerStart(String name) {
+        Stopwatch stopwatch = new Stopwatch();
+        stopwatch.start();
+        timers.put(name, stopwatch);
+    }
+
+    public void timerStop(String name) {
+        if (flag) {
+            LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
+            return;
+        }
+        Stopwatch activeStopwatch = timers.get(name);
+        if (activeStopwatch == null) {
+            throw new IllegalStateException(
+                    String.format("There is no %s timer in the %s monitoring context.", name, this));
+        }
+        activeStopwatch.stop();
+        elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
+        timers.remove(name);
+    }
+
+    public void publish() {
+        flag = true;
+        for (Map.Entry<String, Long> entry : elapsedTimeMsMap.entrySet()) {
+            metrics.timer(name("tso", entry.getKey())).update(entry.getValue());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java
new file mode 100644
index 0000000..f88123f
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tso;
+
+public class MonitoringContextNullImpl implements MonitoringContext {
+    @Override
+    public void timerStart(String name) {
+
+    }
+
+    @Override
+    public void timerStop(String name) {
+
+    }
+
+    @Override
+    public void publish() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index ddebf13..f5f81a3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -37,5 +37,5 @@ interface PersistenceProcessor extends Closeable {
 
     void triggerCurrentBatchFlush() throws Exception;
 
-    Future<Void> persistLowWatermark(long lowWatermark);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 628b73d..ef88b48 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -63,12 +63,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
 
     // TODO Next two need to be either int or AtomicLong
     volatile private long batchSequence;
-
-    private CommitTable.Writer lowWatermarkWriter;
-    private ExecutorService lowWatermarkWriterExecutor;
-
     private MetricsRegistry metrics;
-    private final Timer lwmWriteTimer;
 
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
@@ -97,19 +92,11 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
         // ------------------------------------------------------------------------------------------------------------
 
         this.metrics = metrics;
-        this.lowWatermarkWriter = commitTable.getWriter();
         this.batchSequence = 0L;
         this.batchPool = batchPool;
         this.currentBatch = batchPool.borrowObject();
-        // Low Watermark writer
-        ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
-        this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
-
-        // Metrics config
-        this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
 
         LOG.info("PersistentProcessor initialized");
-
     }
 
     @Override
@@ -177,25 +164,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public Future<Void> persistLowWatermark(final long lowWatermark) {
-
-        return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws IOException {
-                try {
-                    lwmWriteTimer.start();
-                    lowWatermarkWriter.updateLowWatermark(lowWatermark);
-                    lowWatermarkWriter.flush();
-                } finally {
-                    lwmWriteTimer.stop();
-                }
-                return null;
-            }
-        });
-
-    }
-
-    @Override
     public void close() throws IOException {
 
         LOG.info("Terminating Persistence Processor...");

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
new file mode 100644
index 0000000..773500c
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class PersitenceProcessorNullImpl implements PersistenceProcessor {
+
+    @Override
+    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void triggerCurrentBatchFlush() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 7e836aa..b580715 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -44,7 +44,7 @@ interface ReplyProcessor extends Closeable {
      * @param channel
      *            the channel used to send the response back to the client
      */
-    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel);
+    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
 
     /**
      * Allows to send an abort response back to the client.
@@ -54,7 +54,7 @@ interface ReplyProcessor extends Closeable {
      * @param channel
      *            the channel used to send the response back to the client
      */
-    void sendAbortResponse(long startTimestamp, Channel channel);
+    void sendAbortResponse(long startTimestamp, Channel channel, MonitoringContext monCtx);
 
     /**
      * Allow to send a timestamp response back to the client.
@@ -65,7 +65,7 @@ interface ReplyProcessor extends Closeable {
      *            the channel used to send the response back to the client
      */
 
-    void sendTimestampResponse(long startTimestamp, Channel channel);
+    void sendTimestampResponse(long startTimestamp, Channel channel, MonitoringContext monCtx);
 
     /**
      * Allow to send a fence response back to the client.
@@ -78,7 +78,7 @@ interface ReplyProcessor extends Closeable {
      *            the channel used to send the response back to the client
      */
 
-    void sendFenceResponse(long tableID, long fenceTimestamp, Channel c);
+    void sendFenceResponse(long tableID, long fenceTimestamp, Channel channel, MonitoringContext monCtx);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/ccb53892/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 28fe3a0..dda4f8d 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -116,24 +116,16 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
             switch (event.getType()) {
                 case COMMIT:
-                    sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.commit.latency");
-                    commitMeter.mark();
+                    sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case ABORT:
-                    sendAbortResponse(event.getStartTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.abort.latency");
-                    abortMeter.mark();
+                    sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case TIMESTAMP:
-                    sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.timestamp.latency");
-                    timestampMeter.mark();
+                    sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case FENCE:
-                    sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.fence.latency");
-                    fenceMeter.mark();
+                    sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case COMMIT_RETRY:
                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
@@ -189,7 +181,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     }
 
     @Override
-    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
+    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -198,11 +190,12 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
                 .setCommitTimestamp(commitTimestamp);
         builder.setCommitResponse(commitBuilder.build());
         c.write(builder.build());
-
+        commitMeter.mark();
+        monCtx.timerStop("reply.processor.commit.latency");
     }
 
     @Override
-    public void sendAbortResponse(long startTimestamp, Channel c) {
+    public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -210,22 +203,24 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         commitBuilder.setStartTimestamp(startTimestamp);
         builder.setCommitResponse(commitBuilder.build());
         c.write(builder.build());
-
+        abortMeter.mark();
+        monCtx.timerStop("reply.processor.abort.latency");
     }
 
     @Override
-    public void sendTimestampResponse(long startTimestamp, Channel c) {
+    public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
         respBuilder.setStartTimestamp(startTimestamp);
         builder.setTimestampResponse(respBuilder.build());
         c.write(builder.build());
-
+        timestampMeter.mark();
+        monCtx.timerStop("reply.processor.timestamp.latency");
     }
 
     @Override
-    public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c) {
+    public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.FenceResponse.Builder fenceBuilder = TSOProto.FenceResponse.newBuilder();
@@ -233,7 +228,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         fenceBuilder.setFenceId(fenceTimestamp);
         builder.setFenceResponse(fenceBuilder.build());
         c.write(builder.build());
-
+        monCtx.timerStop("reply.processor.fence.latency");
+        fenceMeter.mark();
     }
 
     @Override