You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/04/28 13:08:39 UTC

[incubator-omid] branch 1.0.1 updated: OMID-146 Fix consistensy of compaction when the commit time stamp hasnt reached the commit table yet.

This is an automated email from the ASF dual-hosted git repository.

yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git


The following commit(s) were added to refs/heads/1.0.1 by this push:
     new e8f6939  OMID-146 Fix consistensy of compaction when the commit time stamp hasnt reached the commit table yet.
e8f6939 is described below

commit e8f6939fbc63a16d52ad846c6e81525815270c7d
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Mon Apr 22 16:19:20 2019 +0300

    OMID-146 Fix consistensy of compaction when the commit time stamp hasnt reached the commit table yet.
---
 .../apache/omid/transaction/CompactorScanner.java  |  90 ++++--
 .../apache/omid/transaction/TestCompaction.java    |  27 ++
 .../apache/omid/transaction/TestCompactionLL.java  | 327 +++++++++++++++++++++
 .../omid/transaction/TestSnapshotFilter.java       |   4 +-
 .../apache/omid/tso/AbstractRequestProcessor.java  |   9 +-
 .../src/main/java/org/apache/omid/tso/Batch.java   |   5 +-
 .../java/org/apache/omid/tso/PersistEvent.java     |  10 +-
 .../org/apache/omid/tso/PersistenceProcessor.java  |   3 +-
 .../apache/omid/tso/PersistenceProcessorImpl.java  |   6 +-
 .../omid/tso/PersitenceProcessorNullImpl.java      |   7 +-
 .../java/org/apache/omid/tso/ReplyProcessor.java   |  10 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java    |  31 +-
 .../apache/omid/tso/RequestProcessorPersistCT.java |   6 +-
 .../apache/omid/tso/RequestProcessorSkipCT.java    |   5 +-
 .../org/apache/omid/tso/RetryProcessorImpl.java    |   2 +-
 .../test/java/org/apache/omid/tso/TestBatch.java   |   7 +-
 .../java/org/apache/omid/tso/TestPanicker.java     |   5 +-
 .../apache/omid/tso/TestPersistenceProcessor.java  |  53 ++--
 .../omid/tso/TestPersistenceProcessorHandler.java  |  17 +-
 .../org/apache/omid/tso/TestReplyProcessor.java    |  60 +++-
 .../org/apache/omid/tso/TestRequestProcessor.java  |  22 +-
 .../org/apache/omid/tso/TestRetryProcessor.java    |   2 +-
 ...grationOfTSOClientServerBasicFunctionality.java |  28 ++
 23 files changed, 622 insertions(+), 114 deletions(-)

diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 70c1d27..df30586 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -228,43 +228,91 @@ public class CompactorScanner implements InternalScanner {
         }
     }
 
-    private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
-        Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
-        if (cachedValue != null) {
-            return cachedValue;
-        }
+
+    private Result getShadowCell(byte[] row, byte[] family, byte[] qualifier, long timestamp) throws IOException {
+        Get g = new Get(row);
+        g.addColumn(family, qualifier);
+        g.setTimeStamp(timestamp);
+        Result r = hRegion.get(g);
+        return r;
+    }
+
+
+    private Optional<CommitTimestamp> getCommitTimestampWithRaces(Cell cell) throws IOException {
         try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
+                    cell.getQualifierOffset(),
+                    cell.getQualifierLength());
+            // 2) Then check the commit table
             Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
             if (ct.isPresent()) {
-                commitCache.put(cell.getTimestamp(), ct);
-                return Optional.of(ct.get());
-            } else {
-                Get g = new Get(CellUtil.cloneRow(cell));
-                byte[] family = CellUtil.cloneFamily(cell);
-                byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
-                        cell.getQualifierOffset(),
-                        cell.getQualifierLength());
-                g.addColumn(family, qualifier);
-                g.setTimeStamp(cell.getTimestamp());
-                Result r = hRegion.get(g);
-                if (r.containsColumn(family, qualifier)) {
+                if (ct.get().isValid()) {
+                    return Optional.of(ct.get());
+                }
+                // If invalid still should check sc because maybe we got falsely invalidated by another compaction or ll client
+            }
+
+            // 3) Read from shadow cell
+            Result r = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+            if (r.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
+                Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
+                        Bytes.toLong(r.getValue(family, qualifier)), true));
+                return retval;
+            }
+
+            // [OMID-146] - we have to invalidate a transaction if it hasn't reached the commit table
+            // 4) invalidate the entry
+            Boolean invalidated = commitTableClient.tryInvalidateTransaction(cell.getTimestamp()).get();
+            if (invalidated) {
+                // If we are running lowLatency Omid, we could have managed to invalidate a ct entry,
+                // but the committing client already wrote to shadow cells:
+                Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+                if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
                     Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
-                            Bytes.toLong(r.getValue(family, qualifier)), true));
-                    commitCache.put(cell.getTimestamp(), retval);
+                            Bytes.toLong(r2.getValue(family, qualifier)), true));
+                    commitTableClient.deleteCommitEntry(cell.getTimestamp());
                     return retval;
-
                 }
+                return Optional.absent();
+            }
+
+            // 5) We did not manage to invalidate the transactions then check the commit table
+            Optional<CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
+            if (ct2.isPresent()) {
+                return Optional.of(ct2.get());
+            }
+
+            // 6) Read from shadow cell
+            Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+            if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
+                Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
+                        Bytes.toLong(r2.getValue(family, qualifier)), true));
+                return retval;
             }
+
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IOException("Interrupted while getting commit timestamp from commit table");
         } catch (ExecutionException e) {
             throw new IOException("Error getting commit timestamp from commit table", e);
         }
-        commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
+
         return Optional.absent();
     }
 
+    private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
+
+        // 1) First check the cache
+        Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
+        if (cachedValue != null) {
+            return cachedValue;
+        }
+        Optional<CommitTimestamp> value = getCommitTimestampWithRaces(cell);
+        commitCache.put(cell.getTimestamp(), value);
+        return value;
+    }
+
     private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
         LOG.trace("Retaining cell {}", cell);
         result.add(cell);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index 666b19b..6e9c93b 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -1462,6 +1462,33 @@ public class TestCompaction {
                    "Put shadow cell shouldn't be there");
     }
 
+    @Test(timeOut = 60_000)
+    public void testCommitTableNoInvalidation() throws Exception {
+        String TEST_TABLE = "testCommitTableInvalidation";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable txTable = new TTable(connection, TEST_TABLE);
+        byte[] rowId = Bytes.toBytes("row");
+
+        HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+        Put p = new Put(rowId);
+        p.addColumn(fam, qual, Bytes.toBytes("testValue"));
+        txTable.put(tx1, p);
+
+        HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
+        compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
+
+        try {
+            //give compaction time to invalidate
+            Thread.sleep(1000);
+
+            tm.commit(tx1);
+
+        } catch (RollbackException e) {
+            fail(" Should have not been invalidated");
+        }
+    }
+
+
     private void setCompactorLWM(long lwm, String tableName) throws Exception {
         OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
                 .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java
new file mode 100644
index 0000000..38b3a3d
--- /dev/null
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+
+import static org.mockito.Mockito.doReturn;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import org.apache.hadoop.hbase.client.ResultScanner;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestCompactionLL {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestCompactionLL.class);
+
+    private static final String TEST_FAMILY = "test-fam";
+    private static final String TEST_QUALIFIER = "test-qual";
+
+    private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
+    private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
+    private final byte[] data = Bytes.toBytes("testWrite-1");
+
+    private static final int MAX_VERSIONS = 3;
+
+    private Random randomGenerator;
+    private AbstractTransactionManager tm;
+
+    private Injector injector;
+
+    private Admin admin;
+    private Configuration hbaseConf;
+    private HBaseTestingUtility hbaseTestUtil;
+    private MiniHBaseCluster hbaseCluster;
+
+    private TSOServer tso;
+
+
+    private CommitTable commitTable;
+    private PostCommitActions syncPostCommitter;
+    private static Connection connection;
+
+    @BeforeClass
+    public void setupTestCompation() throws Exception {
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setPort(1235);
+        tsoConfig.setConflictMapSize(1);
+        tsoConfig.setLowLatency(true);
+        tsoConfig.setWaitStrategy("LOW_CPU");
+        injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
+        hbaseConf = injector.getInstance(Configuration.class);
+        HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
+        HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
+
+        // settings required for #testDuplicateDeletes()
+        hbaseConf.setInt("hbase.hstore.compaction.min", 2);
+        hbaseConf.setInt("hbase.hstore.compaction.max", 2);
+        setupHBase();
+        connection = ConnectionFactory.createConnection(hbaseConf);
+        admin = connection.getAdmin();
+        createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
+        setupTSO();
+
+        commitTable = injector.getInstance(CommitTable.class);
+    }
+
+    private void setupHBase() throws Exception {
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        LOG.info("Setting up HBase");
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        LOG.info("Creating HBase MiniCluster");
+        LOG.info("--------------------------------------------------------------------------------------------------");
+        hbaseCluster = hbaseTestUtil.startMiniCluster(1);
+    }
+
+    private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
+                                           HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
+        createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
+
+        createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
+    }
+
+    private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
+        if (!admin.tableExists(TableName.valueOf(tableName))) {
+            LOG.info("Creating {} table...", tableName);
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+            for (byte[] family : families) {
+                HColumnDescriptor datafam = new HColumnDescriptor(family);
+                datafam.setMaxVersions(MAX_VERSIONS);
+                desc.addFamily(datafam);
+            }
+
+            desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
+            admin.createTable(desc);
+            for (byte[] family : families) {
+                CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
+            }
+        }
+
+    }
+
+    private void setupTSO() throws IOException, InterruptedException {
+        tso = injector.getInstance(TSOServer.class);
+        tso.startAndWait();
+        TestUtils.waitForSocketListening("localhost", 1235, 100);
+        Thread.currentThread().setName("UnitTest(s) thread");
+    }
+
+    @AfterClass
+    public void cleanupTestCompation() throws Exception {
+        teardownTSO();
+        hbaseCluster.shutdown();
+    }
+
+    private void teardownTSO() throws IOException, InterruptedException {
+        tso.stopAndWait();
+        TestUtils.waitForSocketNotListening("localhost", 1235, 1000);
+    }
+
+    @BeforeMethod
+    public void setupTestCompactionIndividualTest() throws Exception {
+        randomGenerator = new Random(0xfeedcafeL);
+        tm = spy((AbstractTransactionManager) newTransactionManager());
+    }
+
+    private TransactionManager newTransactionManager() throws Exception {
+        HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+        hbaseOmidClientConf.setConnectionString("localhost:1235");
+        hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+        CommitTable.Client commitTableClient = commitTable.getClient();
+        syncPostCommitter =
+                spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
+        return HBaseTransactionManager.builder(hbaseOmidClientConf)
+                .postCommitter(syncPostCommitter)
+                .commitTableClient(commitTableClient)
+                .build();
+    }
+
+
+
+    @Test(timeOut = 60_000)
+    public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
+        String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReachedLL";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable txTable = new TTable(connection, TEST_TABLE);
+
+        // The KV in this transaction should be discarded but in the end should remain there because
+        // the commit table won't be accessed (simulating an error on access)
+        HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
+        long rowId = randomGenerator.nextLong();
+        Put put = new Put(Bytes.toBytes(rowId));
+        put.addColumn(fam, qual, data);
+        txTable.put(neverendingTx, put);
+        assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+                new TTableCellGetterAdapter(txTable)),
+                "Cell should be there");
+        assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+                new TTableCellGetterAdapter(txTable)),
+                "Shadow cell should not be there");
+
+        assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
+        LOG.info("Flushing table {}", TEST_TABLE);
+        admin.flush(TableName.valueOf(TEST_TABLE));
+        assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
+
+        // Break access to CommitTable functionality in Compactor
+        LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
+        OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
+                .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
+        CommitTable commitTable = injector.getInstance(CommitTable.class);
+        CommitTable.Client commitTableClient = spy(commitTable.getClient());
+        SettableFuture<Long> f = SettableFuture.create();
+        f.setException(new IOException("Unable to read"));
+        doReturn(f).when(commitTableClient).readLowWatermark();
+        omidCompactor.commitTableClient = commitTableClient;
+        LOG.info("Compacting table {}", TEST_TABLE);
+        admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
+
+        LOG.info("Sleeping for 3 secs");
+        Thread.sleep(3000);
+        LOG.info("Waking up after 3 secs");
+
+        // All rows should be there after the failed compaction
+        assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
+        assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+                new TTableCellGetterAdapter(txTable)),
+                "Cell should be there");
+        assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+                new TTableCellGetterAdapter(txTable)),
+                "Shadow cell should not be there");
+    }
+
+
+    @Test(timeOut = 60_000)
+    // test omid-147 in ll mode the scanner should invalidate the transaction
+    public void testCommitTableInvalidation() throws Exception {
+        String TEST_TABLE = "testCommitTableInvalidationLL";
+        createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+        TTable txTable = new TTable(connection, TEST_TABLE);
+        byte[] rowId = Bytes.toBytes("row");
+
+        HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+        Put p = new Put(rowId);
+        p.addColumn(fam, qual, Bytes.toBytes("testValue"));
+        txTable.put(tx1, p);
+
+        HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
+        compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
+
+        try {
+            //give compaction time to invalidate
+            Thread.sleep(1000);
+
+            tm.commit(tx1);
+            fail(" Should have been invalidated");
+        } catch (RollbackException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    private void setCompactorLWM(long lwm, String tableName) throws Exception {
+        OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
+                .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
+        CommitTable commitTable = injector.getInstance(CommitTable.class);
+        CommitTable.Client commitTableClient = spy(commitTable.getClient());
+        SettableFuture<Long> f = SettableFuture.create();
+        f.set(lwm);
+        doReturn(f).when(commitTableClient).readLowWatermark();
+        omidCompactor.commitTableClient = commitTableClient;
+    }
+
+    private void compactEverything(String tableName) throws Exception {
+        compactWithLWM(Long.MAX_VALUE, tableName);
+    }
+
+    private void compactWithLWM(long lwm, String tableName) throws Exception {
+        admin.flush(TableName.valueOf(tableName));
+
+        LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
+        setCompactorLWM(lwm, tableName);
+        LOG.info("Compacting table {}", tableName);
+        admin.majorCompact(TableName.valueOf(tableName));
+
+        LOG.info("Sleeping for 3 secs");
+        Thread.sleep(3000);
+        LOG.info("Waking up after 3 secs");
+    }
+
+    private static long rowCount(String tableName, byte[] family) throws Throwable {
+        Scan scan = new Scan();
+        scan.addFamily(family);
+        Table table = connection.getTable(TableName.valueOf(tableName));
+        try (ResultScanner scanner = table.getScanner(scan)) {
+            int count = 0;
+            while (scanner.next() != null) {
+                count++;
+            }
+            return count;
+        }
+    }
+}
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 2cfc77e..9471aa2 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -566,14 +566,14 @@ public class TestSnapshotFilter {
         Put put1 = new Put(rowName1);
         put1.addColumn(famName1, colName1, dataValue1);
         tt.put(tx1, put1);
-        
+
         tm.commit(tx1);
 
         Transaction tx2 = tm.begin();
         Put put2 = new Put(rowName1);
         put2.addColumn(famName1, colName1, dataValue1);
         tt.put(tx2, put2);
-        
+
         Transaction tx3 = tm.begin();
 
         Get get = new Get(rowName1);
diff --git a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
index 2dac28f..8a527ab 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
@@ -234,7 +235,7 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
             !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
 
             long commitTimestamp = timestampOracle.next();
-
+            Optional<Long> forwardNewWaterMark = Optional.absent();
             if (nonEmptyWriteSet) {
                 long newLowWatermark = lowWatermark;
 
@@ -246,11 +247,11 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
                 if (newLowWatermark != lowWatermark) {
                     LOG.trace("Setting new low Watermark to {}", newLowWatermark);
                     lowWatermark = newLowWatermark;
-                    lowWatermarkWriter.persistLowWatermark(newLowWatermark); // Async persist
+                    forwardNewWaterMark = Optional.of(lowWatermark);
                 }
             }
             event.getMonCtx().timerStop("request.processor.commit.latency");
-            forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
+            forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);
 
         } else {
 
@@ -296,7 +297,7 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
 
     }
 
-    protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception;
     protected abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
     protected abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
     protected abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 111c81c..7e6f43b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
@@ -113,13 +114,13 @@ public class Batch {
 
     }
 
-    void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
+    void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context, Optional<Long> newLowWatermark) {
 
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
         PersistEvent e = events[index];
         context.timerStart("persistence.processor.commit.latency");
-        e.makePersistCommit(startTimestamp, commitTimestamp, c, context);
+        e.makePersistCommit(startTimestamp, commitTimestamp, newLowWatermark, c, context);
 
     }
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index b89cdc5..977af00 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import org.jboss.netty.channel.Channel;
 
 public final class PersistEvent {
@@ -33,15 +34,16 @@ public final class PersistEvent {
 
     private long startTimestamp = 0L;
     private long commitTimestamp = 0L;
+    private Optional<Long> newLowWatermark;
 
-    void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+    void makePersistCommit(long startTimestamp, long commitTimestamp, Optional<Long> newLowWatermark, Channel c, MonitoringContext monCtx) {
 
         this.type = Type.COMMIT;
         this.startTimestamp = startTimestamp;
         this.commitTimestamp = commitTimestamp;
         this.channel = c;
         this.monCtx = monCtx;
-
+        this.newLowWatermark = newLowWatermark;
     }
 
     void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
@@ -111,6 +113,10 @@ public final class PersistEvent {
 
     }
 
+    public Optional<Long> getNewLowWatermark() {
+        return newLowWatermark;
+    }
+
     @Override
     public String toString() {
         return Objects.toStringHelper(this)
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 8bfe048..12831fa 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -17,13 +17,14 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.jboss.netty.channel.Channel;
 
 import java.io.Closeable;
 
 interface PersistenceProcessor extends Closeable {
 
-    void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+    void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark)
             throws Exception;
 
     void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 34276a3..123f783 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -19,6 +19,7 @@ package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.name.Named;
 import com.lmax.disruptor.EventFactory;
@@ -110,10 +111,11 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
+                                 Optional<Long> newLowWatermark)
             throws Exception {
 
-        currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
+        currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
         if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
index 773500c..8e0d171 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.jboss.netty.channel.Channel;
 
 import java.io.IOException;
@@ -24,8 +25,8 @@ import java.io.IOException;
 public class PersitenceProcessorNullImpl implements PersistenceProcessor {
 
     @Override
-    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-
+    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception {
+        System.out.println("a");
     }
 
     @Override
@@ -40,7 +41,7 @@ public class PersitenceProcessorNullImpl implements PersistenceProcessor {
 
     @Override
     public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-
+        System.out.println("a");
     }
 
     @Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index b580715..6c8186c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.jboss.netty.channel.Channel;
 
 import java.io.Closeable;
@@ -36,15 +37,16 @@ interface ReplyProcessor extends Closeable {
 
     /**
      * Allows to send a commit response back to the client.
-     *
-     * @param startTimestamp
+     *  @param startTimestamp
      *            the start timestamp representing the tx identifier that is going to receive the commit response
      * @param commitTimestamp
      *            the commit timestamp
      * @param channel
-     *            the channel used to send the response back to the client
+ *            the channel used to send the response back to the client
+     * @param newLowWatermark
+     *
      */
-    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
+    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx, Optional<Long> newLowWatermark);
 
     /**
      * Allows to send an abort response back to the client.
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index dda4f8d..c350266 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
@@ -69,9 +70,16 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     private final Meter timestampMeter;
     private final Meter fenceMeter;
 
+    private final LowWatermarkWriter lowWatermarkWriter;
+    private long highestLowWaterMarkSeen;
+
     @Inject
     ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
-            MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
+                       MetricsRegistry metrics,
+                       Panicker panicker,
+                       ObjectPool<Batch> batchPool,
+                       LowWatermarkWriter lowWatermarkWriter) {
+        this.lowWatermarkWriter = lowWatermarkWriter;
 
         // ------------------------------------------------------------------------------------------------------------
         // Disruptor initialization
@@ -105,6 +113,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
         LOG.info("ReplyProcessor initialized");
 
+        highestLowWaterMarkSeen = -1;
     }
 
     @VisibleForTesting
@@ -116,7 +125,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
             switch (event.getType()) {
                 case COMMIT:
-                    sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
+                    sendCommitResponse(event.getStartTimestamp(),
+                            event.getCommitTimestamp(),
+                            event.getChannel(),
+                            event.getMonCtx(),
+                            event.getNewLowWatermark());
                     break;
                 case ABORT:
                     sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
@@ -136,7 +149,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         }
 
         batchPool.returnObject(batch);
-
     }
 
     private void processWaitingEvents() throws Exception {
@@ -180,9 +192,18 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    @Override
-    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+    @VisibleForTesting
+    void updateLowWatermark(Optional<Long> newLowwatermark) {
+        if (newLowwatermark.isPresent() && newLowwatermark.get() > highestLowWaterMarkSeen) {
+            highestLowWaterMarkSeen = newLowwatermark.get();
+            lowWatermarkWriter.persistLowWatermark(highestLowWaterMarkSeen);
+        }
+    }
 
+    @Override
+    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx
+            , Optional<Long> newLowWatermark) {
+        updateLowWatermark(newLowWatermark);
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
         commitBuilder.setAborted(false)
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
index 0a58b0e..a967de4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.jboss.netty.channel.Channel;
@@ -42,8 +43,9 @@ public class RequestProcessorPersistCT extends AbstractRequestProcessor {
     }
 
     @Override
-    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-        persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
+                              Optional<Long> lowWatermark) throws Exception {
+        persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx , lowWatermark);
     }
 
     @Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
index 41798f5..dcfc57e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.jboss.netty.channel.Channel;
@@ -56,10 +57,10 @@ public class RequestProcessorSkipCT extends AbstractRequestProcessor {
     }
 
     @Override
-    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> newLowWatermark) {
         commitSuicideIfNotMaster();
         monCtx.timerStart("reply.processor.commit.latency");
-        replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+        replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
     }
 
     @Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 610e760..e667271 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,7 +133,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
             if (commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
-                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
+                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx(), Optional.<Long>absent());
                     txAlreadyCommittedMeter.mark();
                 } else {
                     LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c286f85..0f230cc 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.apache.commons.pool2.PooledObject;
 import org.jboss.netty.channel.Channel;
 import org.mockito.Mock;
@@ -80,7 +81,7 @@ public class TestBatch {
             if (i % 4 == 0) {
                 batch.addTimestamp(ANY_ST, channel, monCtx);
             } else if (i % 4 == 1) {
-                batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+                batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
             } else if (i % 4 == 2) {
                 batch.addCommitRetry(ANY_ST, channel, monCtx);
             } else {
@@ -93,7 +94,7 @@ public class TestBatch {
 
         // Test an exception is thrown when batch is full and a new element is going to be added
         try {
-            batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+            batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
             fail("Should throw an IllegalStateException");
         } catch (IllegalStateException e) {
             assertEquals(e.getMessage(), "batch is full", "message returned doesn't match");
@@ -146,7 +147,7 @@ public class TestBatch {
 
         // Put some elements in the batch...
         batch.addTimestamp(ANY_ST, channel, monCtx);
-        batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+        batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
         batch.addCommitRetry(ANY_ST, channel, monCtx);
         batch.addAbort(ANY_ST, channel, monCtx);
         assertFalse(batch.isEmpty(), "Batch should contain elements");
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 779111d..7c6e003 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -137,7 +138,7 @@ public class TestPanicker {
                                                                  handlers,
                                                                  metrics);
 
-        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
 
         LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
 
@@ -192,7 +193,7 @@ public class TestPanicker {
                                                                  panicker,
                                                                  handlers,
                                                                  metrics);
-        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
 
         LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 5d9e2c2..eb7d7f6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -150,7 +151,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -168,10 +169,10 @@ public class TestPersistenceProcessor {
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
 
         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
 
@@ -193,7 +194,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -213,11 +214,11 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
         // Fill 1st handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
         verify(batchPool, times(2)).borrowObject();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent()); // 2nd batch full
         verify(batchPool, times(3)).borrowObject();
 
         // Test empty flush does not trigger response in getting a new currentBatch
@@ -225,14 +226,14 @@ public class TestPersistenceProcessor {
         verify(batchPool, times(3)).borrowObject();
 
         // Fill 2nd handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 2nd batch full
         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
 
         // Start filling a new currentBatch and flush it immediately
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Batch not full
         verify(batchPool, times(5)).borrowObject();
         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
         verify(batchPool, times(6)).borrowObject();
@@ -260,7 +261,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
 
         // Init a non-HA lease manager
         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
@@ -283,7 +284,7 @@ public class TestPersistenceProcessor {
 
         // The non-ha lease manager always return true for
         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -338,7 +339,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true always
         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -359,7 +360,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -380,7 +381,7 @@ public class TestPersistenceProcessor {
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -404,7 +405,7 @@ public class TestPersistenceProcessor {
         // Configure mock writer to flush unsuccessfully
         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -421,7 +422,7 @@ public class TestPersistenceProcessor {
                                                           "localhost:1234",
                                                           leaseManager,
                                                           commitTable,
-                                                          new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
+                                                          new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter),
                                                           retryProcessor,
                                                           new RuntimeExceptionPanicker());
         }
@@ -438,7 +439,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -463,7 +464,7 @@ public class TestPersistenceProcessor {
         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
 
         // Check the panic is extended!
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
@@ -476,7 +477,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -497,7 +498,7 @@ public class TestPersistenceProcessor {
         MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Check the panic is extended!
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
         proc.triggerCurrentBatchFlush();
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4f190f9..473c981 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
@@ -197,7 +198,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
@@ -256,7 +257,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -286,7 +287,7 @@ public class TestPersistenceProcessorHandler {
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
         batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
-        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -366,9 +367,9 @@ public class TestPersistenceProcessorHandler {
 
         batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
         batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
-        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
-        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -408,7 +409,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -450,7 +451,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -485,7 +486,7 @@ public class TestPersistenceProcessorHandler {
 
         // Prepare test batch
         batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
         batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 54d1e70..4659fa3 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
@@ -35,12 +37,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -65,6 +62,7 @@ public class TestReplyProcessor {
     private static final long FIFTH_ST = 8L;
     private static final long FIFTH_CT = 9L;
     private static final long SIXTH_ST = 10L;
+    private static final long SIXTH_CT = 11L;
 
     @Mock
     private Panicker panicker;
@@ -78,6 +76,7 @@ public class TestReplyProcessor {
 
     // Component under test
     private ReplyProcessorImpl replyProcessor;
+    private LowWatermarkWriter lowWatermarkWriter;
 
     @BeforeMethod(alwaysRun = true, timeOut = 30_000)
     public void initMocksAndComponents() throws Exception {
@@ -92,8 +91,13 @@ public class TestReplyProcessor {
 
         batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool));
 
+        lowWatermarkWriter = mock(LowWatermarkWriter.class);
+        SettableFuture<Void> f = SettableFuture.create();
+        f.set(null);
+        doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
+
+        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter));
     }
 
     @AfterMethod
@@ -104,7 +108,7 @@ public class TestReplyProcessor {
     public void testBadFormedPackageThrowsException() throws Exception {
 
         // We need an instance throwing exceptions for this test
-        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool));
+        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool, lowWatermarkWriter));
 
         // Prepare test batch
         Batch batch = batchPool.borrowObject();
@@ -189,7 +193,7 @@ public class TestReplyProcessor {
         // Prepare first a delayed batch (Batch #3)
         Batch thirdBatch = batchPool.borrowObject();
         thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
-        thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx);
+        thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
         ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
         ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
 
@@ -210,7 +214,7 @@ public class TestReplyProcessor {
         // Prepare another delayed batch (Batch #2)
         Batch secondBatch = batchPool.borrowObject();
         secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), monCtx);
-        secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx);
+        secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
         ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
         ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
 
@@ -249,9 +253,41 @@ public class TestReplyProcessor {
         InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
         inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
         inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx), any(Optional.class));
         inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx), any(Optional.class));
+
+    }
+
+    @Test
+    public void testUpdateLowWaterMarkOnlyForMaxInBatch() throws Exception {
+
+        Batch thirdBatch = batchPool.borrowObject();
+        thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
+        thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.of(100L));
+        thirdBatch.addCommit(THIRD_ST, THIRD_CT, mock(Channel.class), monCtx, Optional.of(50L));
+        thirdBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
+        thirdBatch.addCommit(FIFTH_ST, FIFTH_CT, mock(Channel.class), monCtx, Optional.of(100L));
+        thirdBatch.addCommit(SIXTH_ST, SIXTH_CT, mock(Channel.class), monCtx, Optional.of(150L));
+
+        ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 0);
+
+        replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+        InOrder inOrderWatermarkWriter = inOrder(lowWatermarkWriter, lowWatermarkWriter, lowWatermarkWriter);
+
+        inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(100L));
+        inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(150L));
+
+        verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(50L));
+
+        InOrder inOrderCheckLWM = inOrder(replyProcessor, replyProcessor,replyProcessor,replyProcessor,replyProcessor);
+        inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
+        inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(50L));
+        inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.<Long>absent());
+        inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
+        inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(150L));
 
     }
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 02a5387..b7abf2c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -126,7 +127,7 @@ public class TestRequestProcessor {
         requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class), any(Optional.class));
         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
 
         // test conflict
@@ -143,14 +144,14 @@ public class TestRequestProcessor {
         long thirdTS = TScapture.getValue();
 
         requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
         requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
     @Test(timeOut = 30_000)
-    public void testFence() throws Exception {
+    public void testFence() {
 
         requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
@@ -182,10 +183,9 @@ public class TestRequestProcessor {
 
     }
 
-    @Test(timeOut = 5_000)
-    public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
-
-        final int ANY_START_TS = 1;
+    @Test(timeOut=5_000)
+    public void testLowWaterIsForwardedWhenACacheElementIsEvicted() throws Exception {
+        final long ANY_START_TS = 1;
         final long FIRST_COMMIT_TS_EVICTED = CommitTable.MAX_CHECKPOINTS_PER_TXN;
         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + CommitTable.MAX_CHECKPOINTS_PER_TXN;
 
@@ -196,15 +196,15 @@ public class TestRequestProcessor {
             requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
         }
 
-        Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
+        Thread.sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
         verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
         // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
-        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(FIRST_COMMIT_TS_EVICTED)));
         // Finally it should never be called with the next element
-        verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+        verify(persist, timeout(100).never()).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)));
 
-    }
 
+    }
 }
\ No newline at end of file
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 5476f90..b2b8694 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -99,7 +99,7 @@ public class TestRetryProcessor {
 
         verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
                                                                     secondTSCapture.capture(),
-                                                                    any(Channel.class), any(MonitoringContextImpl.class));
+                                                                    any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
 
         long startTS = firstTSCapture.getValue();
         long commitTS = secondTSCapture.getValue();
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index 4098f6c..2e069fa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -18,12 +18,15 @@
 package org.apache.omid.tso.client;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 
 import org.apache.omid.TestUtils;
 import org.apache.omid.committable.CommitTable;
+import org.apache.omid.tso.LowWatermarkWriter;
 import org.apache.omid.tso.TSOMockModule;
 import org.apache.omid.tso.TSOServer;
 import org.apache.omid.tso.TSOServerConfig;
@@ -39,6 +42,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -60,6 +65,7 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
     private TSOClient tsoClient;
     private TSOClient justAnotherTSOClient;
     private CommitTable.Client commitTableClient;
+    private LowWatermarkWriter lowWatermarkWriter;
 
     @BeforeClass
     public void setup() throws Exception {
@@ -72,6 +78,8 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         Module tsoServerMockModule = new TSOMockModule(tsoConfig);
         Injector injector = Guice.createInjector(tsoServerMockModule);
 
+        lowWatermarkWriter = injector.getInstance(LowWatermarkWriter.class);
+
         CommitTable commitTable = injector.getInstance(CommitTable.class);
         commitTableClient = commitTable.getClient();
 
@@ -288,4 +296,24 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
         assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
     }
 
+    @Test(timeOut = 30_000)
+    public void testLowWaterMarksgetAdvanced() throws ExecutionException, InterruptedException {
+
+        long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
+        HashSet<DummyCellIdImpl> ws = new HashSet<>();
+        for (int i=0; i< 1000*32; ++i) {
+            ws.add(new DummyCellIdImpl(i));
+        }
+
+        Long beforeCommitLWM = commitTableClient.readLowWatermark().get();
+
+        Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, ws).get();
+
+        Thread.sleep(300);
+
+        Long afterCommitLWM = commitTableClient.readLowWatermark().get();
+
+        assert(afterCommitLWM > beforeCommitLWM);
+    }
+
 }