You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2018/12/03 13:46:31 UTC

[36/50] [abbrv] incubator-omid git commit: OMID-90 Add omid low latency mode

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
deleted file mode 100644
index e5fbee8..0000000
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import com.lmax.disruptor.dsl.Disruptor;
-
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import static com.lmax.disruptor.dsl.ProducerType.MULTI;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
-
-class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
-
-    // Disruptor-related attributes
-    private final ExecutorService disruptorExec;
-    private final Disruptor<RequestEvent> disruptor;
-    private final RingBuffer<RequestEvent> requestRing;
-
-    private final TimestampOracle timestampOracle;
-    private final CommitHashMap hashmap;
-    private final Map<Long, Long> tableFences;
-    private final MetricsRegistry metrics;
-    private final PersistenceProcessor persistProc;
-
-    private long lowWatermark = -1L;
-
-    @Inject
-    RequestProcessorImpl(MetricsRegistry metrics,
-                         TimestampOracle timestampOracle,
-                         PersistenceProcessor persistProc,
-                         Panicker panicker,
-                         TSOServerConfig config)
-            throws IOException {
-
-        // ------------------------------------------------------------------------------------------------------------
-        // Disruptor initialization
-        // ------------------------------------------------------------------------------------------------------------
-
-        TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
-
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
-        this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
-
-        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
-        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
-        disruptor.handleEventsWith(this);
-        this.requestRing = disruptor.start();
-
-        // ------------------------------------------------------------------------------------------------------------
-        // Attribute initialization
-        // ------------------------------------------------------------------------------------------------------------
-
-        this.metrics = metrics;
-        this.persistProc = persistProc;
-        this.timestampOracle = timestampOracle;
-        this.hashmap = new CommitHashMap(config.getConflictMapSize());
-        this.tableFences = new HashMap<Long, Long>();
-
-        LOG.info("RequestProcessor initialized");
-
-    }
-
-    /**
-     * This should be called when the TSO gets leadership
-     */
-    @Override
-    public void update(TSOState state) throws Exception {
-        LOG.info("Initializing RequestProcessor state...");
-        this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
-        LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
-    }
-
-    @Override
-    public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
-
-        switch (event.getType()) {
-            case TIMESTAMP:
-                handleTimestamp(event);
-                break;
-            case COMMIT:
-                handleCommit(event);
-                break;
-            case FENCE:
-                handleFence(event);
-                break;
-            default:
-                throw new IllegalStateException("Event not allowed in Request Processor: " + event);
-        }
-
-    }
-
-    @Override
-    public void onTimeout(long sequence) throws Exception {
-
-        // TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
-        // TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
-        // TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
-        // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
-        // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
-        // TODO (cont) in persistProc and it is guaranteed that access them serially.
-        persistProc.triggerCurrentBatchFlush();
-
-    }
-
-    @Override
-    public void timestampRequest(Channel c, MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.timestamp.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeTimestampRequest(e, c, monCtx);
-        requestRing.publish(seq);
-
-    }
-
-    @Override
-    public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
-                              MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.commit.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
-        requestRing.publish(seq);
-
-    }
-
-    @Override
-    public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
-
-        monCtx.timerStart("request.processor.fence.latency");
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
-        requestRing.publish(seq);
-
-    }
-
-    private void handleTimestamp(RequestEvent requestEvent) throws Exception {
-
-        long timestamp = timestampOracle.next();
-        requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
-        persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
-
-    }
-
-    // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
-    private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
-        if (!tableFences.isEmpty()) {
-            for (long tableId: tableIdSet) {
-                Long fence = tableFences.get(tableId);
-                if (fence != null && fence > startTimestamp) {
-                    return true;
-                }
-                if (fence != null && fence < lowWatermark) {
-                    tableFences.remove(tableId); // Garbage collect entries of old fences.
-                }
-            }
-        }
-
-        return false;
-    }
-
- // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
-    private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
-        for (long cellId : writeSet) {
-            long value = hashmap.getLatestWriteForCell(cellId);
-            if (value != 0 && value >= startTimestamp) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private void handleCommit(RequestEvent event) throws Exception {
-
-        long startTimestamp = event.getStartTimestamp();
-        Iterable<Long> writeSet = event.writeSet();
-        Collection<Long> tableIdSet = event.getTableIdSet();
-        boolean isCommitRetry = event.isCommitRetry();
-        Channel c = event.getChannel();
-
-        boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
-
-        // If the transaction started before the low watermark, or
-        // it started before a fence and modified the table the fence created for, or
-        // it has a write-write conflict with a transaction committed after it started
-        // Then it should abort. Otherwise, it can commit.
-        if (startTimestamp > lowWatermark &&
-            !hasConflictsWithFences(startTimestamp, tableIdSet) &&
-            !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
-
-            long commitTimestamp = timestampOracle.next();
-
-            if (nonEmptyWriteSet) {
-                long newLowWatermark = lowWatermark;
-
-                for (long r : writeSet) {
-                    long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
-                    newLowWatermark = Math.max(removed, newLowWatermark);
-                }
-
-                if (newLowWatermark != lowWatermark) {
-                    LOG.trace("Setting new low Watermark to {}", newLowWatermark);
-                    lowWatermark = newLowWatermark;
-                    persistProc.persistLowWatermark(newLowWatermark); // Async persist
-                }
-            }
-            event.getMonCtx().timerStop("request.processor.commit.latency");
-            persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
-
-        } else {
-
-            event.getMonCtx().timerStop("request.processor.commit.latency");
-            if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
-                persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
-            } else {
-                persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
-            }
-
-        }
-
-    }
-
-    private void handleFence(RequestEvent event) throws Exception {
-        long tableID = event.getTableId();
-        Channel c = event.getChannel();
-
-        long fenceTimestamp = timestampOracle.next();
-
-        tableFences.put(tableID, fenceTimestamp);
-        persistProc.addFenceToBatch(tableID, fenceTimestamp, c, event.getMonCtx());
-    }
-
-    @Override
-    public void close() throws IOException {
-
-        LOG.info("Terminating Request Processor...");
-        disruptor.halt();
-        disruptor.shutdown();
-        LOG.info("\tRequest Processor Disruptor shutdown");
-        disruptorExec.shutdownNow();
-        try {
-            disruptorExec.awaitTermination(3, SECONDS);
-            LOG.info("\tRequest Processor Disruptor executor shutdown");
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
-            Thread.currentThread().interrupt();
-        }
-        LOG.info("Request Processor terminated");
-
-    }
-
-    final static class RequestEvent implements Iterable<Long> {
-
-        enum Type {
-            TIMESTAMP, COMMIT, FENCE
-        }
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private boolean isCommitRetry = false;
-        private long startTimestamp = 0;
-        private MonitoringContext monCtx;
-        private long numCells = 0;
-
-        private static final int MAX_INLINE = 40;
-        private Long writeSet[] = new Long[MAX_INLINE];
-        private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
-
-        private Collection<Long> tableIdSet = null;
-        private long tableID = 0;
-
-        static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeCommitRequest(RequestEvent e,
-                                      long startTimestamp,
-                                      MonitoringContext monCtx,
-                                      Collection<Long> writeSet,
-                                      Collection<Long> TableIdSet,
-                                      boolean isRetry,
-                                      Channel c) {
-            e.monCtx = monCtx;
-            e.type = Type.COMMIT;
-            e.channel = c;
-            e.startTimestamp = startTimestamp;
-            e.isCommitRetry = isRetry;
-            if (writeSet.size() > MAX_INLINE) {
-                e.numCells = writeSet.size();
-                e.writeSetAsCollection = writeSet;
-            } else {
-                e.writeSetAsCollection = null;
-                e.numCells = writeSet.size();
-                int i = 0;
-                for (Long cellId : writeSet) {
-                    e.writeSet[i] = cellId;
-                    ++i;
-                }
-            }
-            e.tableIdSet = TableIdSet;
-        }
-
-        static void makeFenceRequest(RequestEvent e,
-                                     long tableID,
-                                     Channel c,
-                                     MonitoringContext monCtx) {
-            e.type = Type.FENCE;
-            e.channel = c;
-            e.monCtx = monCtx;
-            e.tableID = tableID;
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        Type getType() {
-            return type;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        Collection<Long> getTableIdSet() {
-            return tableIdSet;
-        }
-
-        long getTableId() {
-            return tableID;
-        }
-
-        @Override
-        public Iterator<Long> iterator() {
-
-            if (writeSetAsCollection != null) {
-                return writeSetAsCollection.iterator();
-            }
-
-            return new Iterator<Long>() {
-                int i = 0;
-
-                @Override
-                public boolean hasNext() {
-                    return i < numCells;
-                }
-
-                @Override
-                public Long next() {
-                    if (!hasNext()) {
-                        throw new NoSuchElementException();
-                    }
-                    return writeSet[i++];
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-
-        }
-
-        Iterable<Long> writeSet() {
-
-            return this;
-
-        }
-
-        boolean isCommitRetry() {
-            return isCommitRetry;
-        }
-
-        final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
-            @Override
-            public RequestEvent newInstance() {
-                return new RequestEvent();
-            }
-        };
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
new file mode 100644
index 0000000..0a58b0e
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorPersistCT extends AbstractRequestProcessor {
+
+    private final PersistenceProcessor persistenceProcessor;
+
+    @Inject
+    RequestProcessorPersistCT(MetricsRegistry metrics,
+                              TimestampOracle timestampOracle,
+                              PersistenceProcessor persistenceProcessor,
+                              Panicker panicker,
+                              TSOServerConfig config,
+                              LowWatermarkWriter lowWatermarkWriter,
+                              ReplyProcessor replyProcessor) throws IOException {
+
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+        this.persistenceProcessor = persistenceProcessor;
+        requestRing = disruptor.start();
+    }
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void onTimeout() throws Exception {
+        persistenceProcessor.triggerCurrentBatchFlush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
new file mode 100644
index 0000000..41798f5
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorSkipCT extends AbstractRequestProcessor {
+
+
+    private final ReplyProcessor replyProcessor;
+
+    private final LeaseManagement leaseManager;
+    private final Panicker panicker;
+    private final String tsoHostAndPort;
+
+    @Inject
+    RequestProcessorSkipCT(MetricsRegistry metrics,
+                           TimestampOracle timestampOracle,
+                           ReplyProcessor replyProcessor,
+                           Panicker panicker,
+                           LeaseManagement leaseManager,
+                           TSOServerConfig config,
+                           LowWatermarkWriter lowWatermarkWriter,
+                           String tsoHostAndPort) throws IOException {
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter, replyProcessor);
+        this.replyProcessor = replyProcessor;
+        this.tsoHostAndPort = tsoHostAndPort;
+        requestRing = disruptor.start();
+        this.leaseManager = leaseManager;
+        this.panicker = panicker;
+    }
+
+    private void commitSuicideIfNotMaster() {
+        if (!leaseManager.stillInLeasePeriod()) {
+            panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
+        }
+    }
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+        commitSuicideIfNotMaster();
+        monCtx.timerStart("reply.processor.commit.latency");
+        replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.abort.latency");
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.abort.latency");
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        monCtx.timerStart("reply.processor.timestamp.latency");
+        replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void onTimeout() {
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 6d923be..610e760 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,16 +133,16 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
             if (commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
-                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
+                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
                     txAlreadyCommittedMeter.mark();
                 } else {
                     LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
-                    replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                    replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                     invalidTxMeter.mark();
                 }
             } else {
                 LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
-                replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                 noCTFoundMeter.mark();
             }
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index a218a1d..f936e88 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -165,7 +165,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
             }
 
             if (request.hasTimestampRequest()) {
-                requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics));
+                requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasCommitRequest()) {
                 TSOProto.CommitRequest cr = request.getCommitRequest();
                 requestProcessor.commitRequest(cr.getStartTimestamp(),
@@ -173,10 +173,12 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
                                                cr.getTableIdList(),
                                                cr.getIsRetry(),
                                                ctx.getChannel(),
-                                               new MonitoringContext(metrics));
+                                               MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasFenceRequest()) {
                 TSOProto.FenceRequest fr = request.getFenceRequest();
-                requestProcessor.fenceRequest(fr.getTableId(), ctx.getChannel(), new MonitoringContext(metrics));
+                requestProcessor.fenceRequest(fr.getTableId(),
+                        ctx.getChannel(),
+                        MonitoringContextFactory.getInstance(config,metrics));
             } else {
                 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
                 ctx.getChannel().close();
@@ -193,7 +195,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
             LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
             return;
         }
-        LOG.warn("Unexpected exception from downstream. Closing channel {}", ctx.getChannel(), e.getCause());
+        LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
         ctx.getChannel().close();
     }
 
@@ -244,6 +246,7 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
         } else {
             response.setClientCompatible(false);
         }
+        response.setLowLatency(config.getLowLatency());
         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 4d0d844..5c96aa3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -52,7 +52,7 @@ class TSOModule extends AbstractModule {
         } else {
             bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
         }
-
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
         bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
index f30e64d..d97b824 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
@@ -51,8 +51,9 @@ public class TSOServer extends AbstractIdleService {
     @Inject
     private RetryProcessor retryProcessor;
     @Inject
-    private ReplyProcessor replyProcessor;
-
+    public ReplyProcessor replyProcessor;
+    @Inject
+    private LowWatermarkWriter lowWatermarkWriter;
     // ----------------------------------------------------------------------------------------------------------------
     // High availability related variables
     // ----------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 8f061a1..e28add3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -89,6 +89,26 @@ public class TSOServerConfig extends SecureHBaseConfig {
 
     private String timestampType;
 
+    private Boolean lowLatency;
+
+    public boolean monitorContext;
+
+    public boolean getMonitorContext() {
+        return monitorContext;
+    }
+
+    public void setMonitorContext(boolean monitorContext) {
+        this.monitorContext = monitorContext;
+    }
+
+    public Boolean getLowLatency() {
+        return lowLatency;
+    }
+
+    public void setLowLatency(Boolean lowLatency) {
+        this.lowLatency = lowLatency;
+    }
+
     public int getPort() {
         return port;
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index 454526f..fec82af 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -83,8 +83,8 @@ public class TimestampOracleImpl implements TimestampOracle {
 
     }
 
-    static final long TIMESTAMP_BATCH = 10_000_000; // 10 million
-    private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000; // 1 million
+    static final long TIMESTAMP_BATCH = 10_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 10 million
+    private static final long TIMESTAMP_REMAINING_THRESHOLD = 1_000_000*AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; // 1 million
 
     private long lastTimestamp;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index 74895a4..3129904 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -44,6 +44,7 @@ batchPersistTimeoutInMs: 10
 # INCREMENTAL - [Default] regular counter
 # WORLD_TIME - world time based counter
 timestampType: INCREMENTAL
+lowLatency: false
 # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
 timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
 commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
@@ -52,6 +53,8 @@ leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ]
 # Default stats/metrics configuration
 metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
 
+monitorContext: false
+
 # ---------------------------------------------------------------------------------------------------------------------
 # Timestamp storage configuration options
 # ---------------------------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index fc30e60..a346e5e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -58,6 +58,7 @@ public class TSOMockModule extends AbstractModule {
             bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
         }
         bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
         install(config.getLeaseModule());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 573cd89..c286f85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -43,7 +43,7 @@ public class TestBatch {
     @Mock
     private Channel channel;
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     @BeforeMethod
     void setup() {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index ae89f01..779111d 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -137,9 +137,12 @@ public class TestPanicker {
                                                                  handlers,
                                                                  metrics);
 
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
+
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker,
+                mock(TSOServerConfig.class), lowWatermarkWriter, mock(ReplyProcessor.class));
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
@@ -189,9 +192,12 @@ public class TestPanicker {
                                                                  panicker,
                                                                  handlers,
                                                                  metrics);
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class),
+                lowWatermarkWriter, mock(ReplyProcessor.class));
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 4779608..5d9e2c2 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -68,6 +68,7 @@ public class TestPersistenceProcessor {
 
     private MetricsRegistry metrics;
     private CommitTable commitTable;
+    private LowWatermarkWriter lowWatermarkWriter;
 
     @BeforeMethod(alwaysRun = true, timeOut = 30_000)
     public void initMocksAndComponents() throws Exception {
@@ -101,6 +102,7 @@ public class TestPersistenceProcessor {
     public void testLowWatermarkIsPersisted() throws Exception {
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
+        lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -123,7 +125,7 @@ public class TestPersistenceProcessor {
                                              handlers,
                                              metrics);
 
-        persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+        lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
 
         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
         CommitTable.Writer lwmWriter = commitTable.getWriter();
@@ -166,10 +168,10 @@ public class TestPersistenceProcessor {
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
 
         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
 
@@ -211,8 +213,8 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
         // Fill 1st handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
         verify(batchPool, times(2)).borrowObject();
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
@@ -223,14 +225,14 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(3)).borrowObject();
 
         // Fill 2nd handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
 
         // Start filling a new currentBatch and flush it immediately
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
         verify(batchPool, times(5)).borrowObject();
         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
         verify(batchPool, times(6)).borrowObject();
@@ -281,7 +283,7 @@ public class TestPersistenceProcessor {
 
         // The non-ha lease manager always return true for
         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -336,7 +338,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true always
         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -357,7 +359,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -378,7 +380,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -402,7 +404,7 @@ public class TestPersistenceProcessor {
         // Configure mock writer to flush unsuccessfully
         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -452,7 +454,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Configure lease manager to work normally
         doReturn(true).when(leaseManager).stillInLeasePeriod();
@@ -492,7 +494,7 @@ public class TestPersistenceProcessor {
 
         // Configure writer to explode with a runtime exception
         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Check the panic is extended!
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 43f354f..4f190f9 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -167,7 +167,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertTrue(batch.isEmpty());
 
@@ -178,14 +178,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -197,14 +197,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -217,14 +217,14 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -236,7 +236,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -245,7 +245,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -256,8 +256,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -269,7 +269,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -285,8 +285,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -298,7 +298,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
@@ -311,8 +311,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -324,8 +324,8 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -336,8 +336,8 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -349,7 +349,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 2);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -364,12 +364,12 @@ public class TestPersistenceProcessorHandler {
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
 
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -381,7 +381,7 @@ public class TestPersistenceProcessorHandler {
 
         verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 4);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -408,7 +408,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -450,7 +450,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -485,7 +485,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ead24b..54d1e70 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -70,7 +70,7 @@ public class TestReplyProcessor {
     private Panicker panicker;
 
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     private MetricsRegistry metrics;
 
@@ -247,11 +247,11 @@ public class TestReplyProcessor {
         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
 
         InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
-        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 1c44d05..645caa1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -59,26 +59,32 @@ public class TestRequestProcessor {
     // Request processor under test
     private RequestProcessor requestProc;
 
+    private LowWatermarkWriter lowWatermarkWriter;
+    private TimestampOracleImpl timestampOracle;
+    private ReplyProcessor replyProcessor;
+
     @BeforeMethod
     public void beforeMethod() throws Exception {
 
         // Build the required scaffolding for the test
         MetricsRegistry metrics = new NullMetricsProvider();
 
-        TSOServerConfig config = new TSOServerConfig();
-        config.setConflictMapSize(CONFLICT_MAP_SIZE);
-
         TimestampOracleImpl timestampOracle =
                 new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
 
         stateManager = new TSOStateManagerImpl(timestampOracle);
-
+        lowWatermarkWriter = mock(LowWatermarkWriter.class);
         persist = mock(PersistenceProcessor.class);
+        replyProcessor = mock(ReplyProcessor.class);
         SettableFuture<Void> f = SettableFuture.create();
         f.set(null);
-        doReturn(f).when(persist).persistLowWatermark(any(Long.class));
+        doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
 
-        requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
+        TSOServerConfig config = new TSOServerConfig();
+        config.setConflictMapSize(CONFLICT_MAP_SIZE);
+
+        requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(),
+                config, lowWatermarkWriter,replyProcessor);
 
         // Initialize the state for the experiment
         stateManager.register(requestProc);
@@ -89,15 +95,15 @@ public class TestRequestProcessor {
     @Test(timeOut = 30_000)
     public void testTimestamp() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
 
         long firstTS = firstTScapture.getValue();
         // verify that timestamps increase monotonically
         for (int i = 0; i < 100; i++) {
-            requestProc.timestampRequest(null, new MonitoringContext(metrics));
+            requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
             verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
             firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
         }
@@ -107,48 +113,48 @@ public class TestRequestProcessor {
     @Test(timeOut = 30_000)
     public void testCommit() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long firstTS = TScapture.getValue();
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
-        requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+        requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
 
-        requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+        requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
 
         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
 
         // test conflict
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(2)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long secondTS = TScapture.getValue();
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(3)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long thirdTS = TScapture.getValue();
 
-        requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
-        requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+        requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
     @Test(timeOut = 30_000)
     public void testFence() throws Exception {
 
-        requestProc.fenceRequest(666L, null, new MonitoringContext(metrics));
+        requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
-        verify(persist, timeout(100).times(1)).addFenceToBatch(eq(666L),
+        verify(replyProcessor, timeout(100).times(1)).sendFenceResponse(eq(666L),
                 firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
 
     }
@@ -159,11 +165,11 @@ public class TestRequestProcessor {
         List<Long> writeSet = Collections.emptyList();
 
         // Start a transaction...
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
                                                                    any(Channel.class),
-                                                                   any(MonitoringContext.class));
+                                                                   any(MonitoringContextImpl.class));
         long startTS = capturedTS.getValue();
 
         // ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -171,8 +177,8 @@ public class TestRequestProcessor {
         stateManager.initialize();
 
         // ...check that the transaction is aborted when trying to commit
-        requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
@@ -187,17 +193,17 @@ public class TestRequestProcessor {
         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
-            requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+            requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         }
 
         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
         // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
         // Finally it should never be called with the next element
-        verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index ab17ecc..5476f90 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -56,6 +56,8 @@ public class TestRetryProcessor {
     private Panicker panicker;
     @Mock
     private MetricsRegistry metrics;
+    @Mock
+    private MonitoringContextImpl monCtx;
 
     private CommitTable commitTable;
 
@@ -74,10 +76,10 @@ public class TestRetryProcessor {
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
-        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = firstTSCapture.getValue();
         assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
     }
@@ -91,13 +93,13 @@ public class TestRetryProcessor {
 
         // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
         commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
 
         verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
                                                                     secondTSCapture.capture(),
-                                                                    any(Channel.class));
+                                                                    any(Channel.class), any(MonitoringContextImpl.class));
 
         long startTS = firstTSCapture.getValue();
         long commitTS = secondTSCapture.getValue();
@@ -124,9 +126,9 @@ public class TestRetryProcessor {
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = startTSCapture.getValue();
         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/08caa204/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 157bb48..245f3b6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -260,9 +260,9 @@ public class TestTSOChannelHandlerNetty {
         tsBuilder.setTimestampRequest(tsRequestBuilder.build());
         // Write into the channel
         channel.write(tsBuilder.build()).await();
-        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).never())
-                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     private void testWritingCommitRequest(Channel channel) throws InterruptedException {
@@ -277,9 +277,9 @@ public class TestTSOChannelHandlerNetty {
         assertTrue(r.hasCommitRequest());
         // Write into the channel
         channel.write(commitBuilder.build()).await();
-        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
-                .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     private void testWritingFenceRequest(Channel channel) throws InterruptedException {
@@ -293,9 +293,9 @@ public class TestTSOChannelHandlerNetty {
         assertTrue(r.hasFenceRequest());
         // Write into the channel
         channel.write(fenceBuilder.build()).await();
-        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
-                .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContext.class));
+                .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     // ----------------------------------------------------------------------------------------------------------------