You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:54 UTC

[47/50] [abbrv] incubator-omid git commit: Persist LWM directly without batching it

Persist LWM directly without batching it

This simplifies the batch creation as we don't need a special bucket
for the low watermark anymore. It also simplifies the treatment of the
low watermark in the processors.

Change-Id: Ie86d92dcba5418647b3803c72cadf60cd2e5e6a9


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/5b27a074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/5b27a074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/5b27a074

Branch: refs/heads/master
Commit: 5b27a0740dbd2752872ebd516cdddb0ca73d5e4b
Parents: 1e74863
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Tue May 10 17:58:17 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Tue May 10 20:06:16 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    | 17 +----
 .../org/apache/omid/tso/BatchPoolModule.java    |  4 +-
 .../java/org/apache/omid/tso/PersistEvent.java  | 10 +--
 .../apache/omid/tso/PersistenceProcessor.java   |  6 +-
 .../omid/tso/PersistenceProcessorHandler.java   |  3 -
 .../omid/tso/PersistenceProcessorImpl.java      | 56 ++++++++++----
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  3 -
 .../apache/omid/tso/RequestProcessorImpl.java   |  6 +-
 .../java/org/apache/omid/tso/TestBatch.java     | 18 +----
 .../java/org/apache/omid/tso/TestPanicker.java  |  8 +-
 .../omid/tso/TestPersistenceProcessor.java      | 79 ++++++++++++++------
 .../apache/omid/tso/TestRequestProcessor.java   | 13 +++-
 ...stTSOClientRequestAndResponseBehaviours.java | 24 ++----
 13 files changed, 137 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index a36d2a7..c5ed696 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -64,13 +64,6 @@ public class Batch {
 
     }
 
-    boolean isLastEntryEmpty() {
-
-        Preconditions.checkState(numEvents <= size, "numEvents > size");
-        return numEvents == (size - 1);
-
-    }
-
     int getNumEvents() {
         return numEvents;
     }
@@ -82,7 +75,7 @@ public class Batch {
     }
 
     PersistEvent get(int idx) {
-        Preconditions.checkState(numEvents > 0  && 0 <= idx && idx < numEvents);
+        Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
         return events[idx];
     }
 
@@ -119,14 +112,6 @@ public class Batch {
 
     }
 
-    void addLowWatermark(long lowWatermark, MonitoringContext context) {
-        Preconditions.checkState(!isFull(), "batch is full");
-        int index = numEvents++;
-        PersistEvent e = events[index];
-        e.makePersistLowWatermark(lowWatermark, context);
-
-    }
-
     @Override
     public String toString() {
         return Objects.toStringHelper(this)

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java b/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
index 96e8717..c28f3aa 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/BatchPoolModule.java
@@ -48,9 +48,9 @@ public class BatchPoolModule extends AbstractModule {
     ObjectPool<Batch> getBatchPool() throws Exception {
 
         int poolSize = config.getNumConcurrentCTWriters();
-        int batchSize = config.getBatchSizePerCTWriter() + 1; // Add 1 element to batch size for storing LWM
+        int batchSize = config.getBatchSizePerCTWriter();
 
-        LOG.info("Pool Size (# of Batches) {}; Batch Size {} (including LWM bucket)", poolSize, batchSize);
+        LOG.info("Pool Size (# of Batches) {}; Batch Size {}", poolSize, batchSize);
         LOG.info("Total Batch Size (Pool size * Batch Size): {}", poolSize * batchSize);
         GenericObjectPoolConfig config = new GenericObjectPoolConfig();
         config.setMaxTotal(poolSize);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 32820f6..8d4fd85 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,7 +25,7 @@ public final class PersistEvent {
     private MonitoringContext monCtx;
 
     enum Type {
-        TIMESTAMP, COMMIT, ABORT, LOW_WATERMARK
+        TIMESTAMP, COMMIT, ABORT
     }
 
     private Type type = null;
@@ -65,14 +65,6 @@ public final class PersistEvent {
 
     }
 
-    void makePersistLowWatermark(long lowWatermark, MonitoringContext monCtx) {
-
-        this.type = Type.LOW_WATERMARK;
-        this.lowWatermark = lowWatermark;
-        this.monCtx = monCtx;
-
-    }
-
     MonitoringContext getMonCtx() {
 
         return monCtx;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 9467c13..f4e4a51 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -19,6 +19,8 @@ package org.apache.omid.tso;
 
 import org.jboss.netty.channel.Channel;
 
+import java.util.concurrent.Future;
+
 interface PersistenceProcessor {
 
     void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
@@ -28,8 +30,8 @@ interface PersistenceProcessor {
 
     void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 
-    void addLowWatermarkToBatch(long lowWatermark, MonitoringContext monCtx);
-
     void triggerCurrentBatchFlush() throws Exception;
 
+    Future<Void> persistLowWatermark(long lowWatermark);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 7e84539..cf7557a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -86,9 +86,6 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
             case TIMESTAMP:
                 localEvent.getMonCtx().timerStart("timestampPersistProcessor");
                 break;
-            case LOW_WATERMARK:
-                writer.updateLowWatermark(localEvent.getLowWatermark());
-                break;
             default:
                 throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
             }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index b78ad1a..761d30b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -25,14 +25,21 @@ import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.WorkerPool;
 import org.apache.commons.pool2.ObjectPool;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.Timer;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import static org.apache.omid.metrics.MetricsUtils.name;
 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
 
@@ -50,21 +57,32 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
 
     // TODO Next two need to be either int or AtomicLong
     volatile private long batchSequence;
-    volatile private long lowWatermark = INITIAL_LWM_VALUE;
 
-    private MonitoringContext lowWatermarkContext;
+    private CommitTable.Writer lowWatermarkWriter;
+    private ExecutorService lowWatermarkWriterExecutor;
+
+    private MetricsRegistry metrics;
+    private final Timer lwmWriteTimer;
 
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
+                             CommitTable commitTable,
                              ObjectPool<Batch> batchPool,
                              Panicker panicker,
-                             PersistenceProcessorHandler[] handlers)
+                             PersistenceProcessorHandler[] handlers,
+                             MetricsRegistry metrics)
             throws Exception {
 
+        this.metrics = metrics;
+        this.lowWatermarkWriter = commitTable.getWriter();
         this.batchSequence = 0L;
         this.batchPool = batchPool;
         this.currentBatch = batchPool.borrowObject();
 
+        // Low Watermark writer
+        ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
+        lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
+
         // Disruptor configuration
         this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
 
@@ -79,6 +97,9 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
         this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
         persistProcessor.start(requestExec);
 
+        // Metrics config
+        this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
+
     }
 
     @Override
@@ -87,7 +108,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
         if (currentBatch.isEmpty()) {
             return;
         }
-        currentBatch.addLowWatermark(this.lowWatermark, this.lowWatermarkContext);
         long seq = persistRing.next();
         PersistBatchEvent e = persistRing.get(seq);
         makePersistBatch(e, batchSequence++, currentBatch);
@@ -101,7 +121,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
             throws Exception {
 
         currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
-        if (currentBatch.isLastEntryEmpty()) {
+        if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }
 
@@ -112,28 +132,38 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
             throws Exception {
 
         currentBatch.addAbort(startTimestamp, isRetry, c, context);
-        if (currentBatch.isLastEntryEmpty()) {
+        if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }
 
     }
 
     @Override
-    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context)
-            throws Exception {
+    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context) throws Exception {
 
         currentBatch.addTimestamp(startTimestamp, c, context);
-        if (currentBatch.isLastEntryEmpty()) {
+        if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }
 
     }
 
     @Override
-    public void addLowWatermarkToBatch(long lowWatermark, MonitoringContext context) {
-
-        this.lowWatermark = lowWatermark;
-        this.lowWatermarkContext = context;
+    public Future<Void> persistLowWatermark(final long lowWatermark) {
+
+        return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException {
+                try {
+                    lwmWriteTimer.start();
+                    lowWatermarkWriter.updateLowWatermark(lowWatermark);
+                    lowWatermarkWriter.flush();
+                } finally {
+                    lwmWriteTimer.stop();
+                }
+                return null;
+            }
+        });
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index a4e2ce1..c1709ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -116,9 +116,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
                 sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
                 localEvent.getMonCtx().timerStop(name);
                 break;
-            // TODO Check if we still need this
-            case LOW_WATERMARK:
-                break;
             default:
                 LOG.error("Unknown event {}", localEvent.getType());
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 12f8326..4e17850 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
@@ -91,8 +92,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
     public void update(TSOState state) throws Exception {
         LOG.info("Initializing RequestProcessor...");
         this.lowWatermark = state.getLowWatermark();
-        persistProc.addLowWatermarkToBatch(lowWatermark, new MonitoringContext(metrics));
-        persistProc.triggerCurrentBatchFlush();
+        persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
         LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
     }
 
@@ -210,7 +210,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
                     if (newLowWatermark != lowWatermark) {
                         LOG.trace("Setting new low Watermark to {}", newLowWatermark);
                         lowWatermark = newLowWatermark;
-                        persistProc.addLowWatermarkToBatch(newLowWatermark, event.getMonCtx());
+                        persistProc.persistLowWatermark(newLowWatermark); // Async persist
                     }
                 }
                 persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 45fa639..c472606 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -38,9 +38,6 @@ public class TestBatch {
     private static final long ANY_ST = 1231;
     private static final long ANY_CT = 2241;
 
-    private static final int DUMMY_LWM = 212;
-    private static final int NEW_LWM = 123;
-
     @Mock
     private Channel channel;
     @Mock
@@ -72,7 +69,7 @@ public class TestBatch {
         }
 
         // Test when filling the batch with different types of events, that becomes full
-        for (int i = 0; i < (BATCH_SIZE - 1); i++) {
+        for (int i = 0; i < BATCH_SIZE; i++) {
             if (i % 3 == 0) {
                 batch.addTimestamp(ANY_ST, channel, monCtx);
             } else if (i % 3 == 1) {
@@ -81,8 +78,6 @@ public class TestBatch {
                 batch.addAbort(ANY_ST, false, channel, monCtx);
             }
         }
-        assertTrue(batch.isLastEntryEmpty(), "Should be only one entry left in the batch");
-        batch.addLowWatermark(DUMMY_LWM, monCtx); // Add DUMMY_LWM as last element
         assertFalse(batch.isEmpty(), "Batch should contain elements");
         assertTrue(batch.isFull(), "Batch should be full");
         assertEquals(batch.getNumEvents(), BATCH_SIZE, "Num events should be " + BATCH_SIZE);
@@ -93,7 +88,7 @@ public class TestBatch {
             fail("Should throw an IllegalStateException");
         } catch (IllegalStateException e) {
             assertEquals(e.getMessage(), "batch is full", "message returned doesn't match");
-            LOG.debug("IllegalStateException catched properly");
+            LOG.debug("IllegalStateException catch properly");
         }
         assertTrue(batch.isFull(), "Batch shouldn't be empty");
 
@@ -101,24 +96,19 @@ public class TestBatch {
         assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
         assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
         assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
-        assertTrue(batch.get(BATCH_SIZE - 1).getType().equals(PersistEvent.Type.LOW_WATERMARK));
 
         // Set a new value for last element in Batch and check we obtain the right result
         batch.decreaseNumEvents();
         assertEquals(batch.getNumEvents(), BATCH_SIZE - 1, "Num events should be " + (BATCH_SIZE - 1));
-        assertTrue(batch.isLastEntryEmpty(), "Should be only one entry left in the batch");
         try {
             batch.get(BATCH_SIZE - 1);
             fail();
         } catch (IllegalStateException ex) {
             // Expected, as we can not access elements in the batch greater than the current number of events
         }
-        batch.addLowWatermark(NEW_LWM, monCtx); // Add new LWM as last element
-        assertTrue(batch.get(BATCH_SIZE - 1).getType().equals(PersistEvent.Type.LOW_WATERMARK));
-        assertEquals(batch.get(BATCH_SIZE - 1).getLowWatermark(), NEW_LWM);
 
-        // Re-check that batch is full again
-        assertTrue(batch.isFull(), "Batch shouldn't be empty");
+        // Re-check that batch is NOT full
+        assertFalse(batch.isFull(), "Batch shouldn't be full");
 
         // Clear the batch and goes back to its initial state
         batch.clear();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 716f64f..a5f4eed 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -132,9 +132,11 @@ public class TestPanicker {
         }
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+                                                                 commitTable,
                                                                  batchPool,
                                                                  panicker,
-                                                                 handlers);
+                                                                 handlers,
+                                                                 metrics);
 
         proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
 
@@ -182,9 +184,11 @@ public class TestPanicker {
         }
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+                                                                 commitTable,
                                                                  batchPool,
                                                                  panicker,
-                                                                 handlers);
+                                                                 handlers,
+                                                                 metrics);
         proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
 
         new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 91b9504..6e5c040 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -22,6 +22,7 @@ import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -43,13 +44,14 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 
 // TODO Refactor: Make visible currentBatch in PersistenceProcessorImpl to add proper verifications
 public class TestPersistenceProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessor.class);
 
-    private static final long ANY_LWM = 0L;
+    private static final long ANY_LWM = 1234L;
     private static final int ANY_ST = 0;
     private static final int ANY_CT = 1;
 
@@ -93,7 +95,41 @@ public class TestPersistenceProcessor {
         Mockito.reset(mockWriter);
     }
 
-    @Test
+    @Test(timeOut = 10_000)
+    public void testLowWatermarkIsPersisted() throws Exception {
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+
+        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+            handlers[i] = new PersistenceProcessorHandler(metrics,
+                                                          "localhost:1234",
+                                                          mock(LeaseManager.class),
+                                                          commitTable,
+                                                          mock(ReplyProcessor.class),
+                                                          retryProcessor,
+                                                          panicker);
+        }
+
+        // Component under test
+        PersistenceProcessorImpl persistenceProcessor =
+                new PersistenceProcessorImpl(tsoConfig,
+                                             commitTable,
+                                             mock(ObjectPool.class),
+                                             panicker,
+                                             handlers,
+                                             metrics);
+
+        persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+
+        ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
+        CommitTable.Writer lwmWriter = commitTable.getWriter();
+        verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture());
+        assertEquals(lwmCapture.getValue().longValue(), ANY_LWM);
+
+    }
+
+    @Test(timeOut = 10_000)
     public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception {
 
         final int NUM_CT_WRITERS = 1;
@@ -122,8 +158,8 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
@@ -166,9 +202,8 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
-        proc.addLowWatermarkToBatch(ANY_LWM, mock(MonitoringContext.class)); // Add a fake DUMMY_LWM
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
@@ -238,9 +273,8 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
-        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // The non-ha lease manager always return true for
         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
@@ -294,9 +328,8 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
-
-        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true always
         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
@@ -316,11 +349,11 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        //batchPool.notifyEmptyBatch(0); // Unlock this thread to check the panicker
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
@@ -337,7 +370,8 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
@@ -358,7 +392,8 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, batchPool, panicker, handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
         // an exception when flush
@@ -413,7 +448,8 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         MonitoringContext monCtx = new MonitoringContext(metrics);
 
@@ -423,8 +459,6 @@ public class TestPersistenceProcessor {
         // Configure commit table writer to explode when flushing changes to DB
         doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
 
-        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
-
         // Check the panic is extended!
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
         proc.triggerCurrentBatchFlush();
@@ -452,14 +486,13 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, batchPool, panicker, handlers);
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+                                                                     panicker, handlers, metrics);
 
         // Configure writer to explode with a runtime exception
         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
         MonitoringContext monCtx = new MonitoringContext(metrics);
 
-        proc.addLowWatermarkToBatch(ANY_LWM, new MonitoringContext(metrics)); // Add a fake DUMMY_LWM
-
         // Check the panic is extended!
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
         proc.triggerCurrentBatchFlush();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 11f1a99..5b17d1e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.omid.tso;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.jboss.netty.channel.Channel;
@@ -34,6 +35,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -67,6 +69,9 @@ public class TestRequestProcessor {
         stateManager = new TSOStateManagerImpl(timestampOracle);
 
         persist = mock(PersistenceProcessor.class);
+        SettableFuture<Void> f = SettableFuture.create();
+        f.set(null);
+        doReturn(f).when(persist).persistLowWatermark(any(Long.class));
 
         TSOServerConfig config = new TSOServerConfig();
         config.setMaxItems(CONFLICT_MAP_SIZE);
@@ -165,7 +170,7 @@ public class TestRequestProcessor {
         final long FIRST_COMMIT_TS_EVICTED = 1L;
         final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
 
-                // Fill the cache to provoke a cache eviction
+        // Fill the cache to provoke a cache eviction
         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
@@ -175,11 +180,11 @@ public class TestRequestProcessor {
         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
-        verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(0L), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
         // Then, check it is called when cache is full and the first element is evicted (should be a 1)
-        verify(persist, timeout(100).times(1)).addLowWatermarkToBatch(eq(FIRST_COMMIT_TS_EVICTED), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
         // Finally it should never be called with the next element
-        verify(persist, timeout(100).never()).addLowWatermarkToBatch(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED), any(MonitoringContext.class));
+        verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/5b27a074/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
index 203f46f..e41e656 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java
@@ -69,8 +69,8 @@ public class TestTSOClientRequestAndResponseBehaviours {
     private PausableTimestampOracle pausableTSOracle;
     private CommitTable commitTable;
 
-    @BeforeClass
-    public void setup() throws Exception {
+    @BeforeMethod
+    public void beforeMethod() throws Exception {
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
         tsoConfig.setMaxItems(1000);
@@ -94,19 +94,6 @@ public class TestTSOClientRequestAndResponseBehaviours {
         pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
         commitTable = injector.getInstance(CommitTable.class);
 
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-
-        tsoServer.stopAndWait();
-        tsoServer = null;
-        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
-
-    }
-
-    @BeforeMethod
-    public void beforeMethod() {
         OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
         tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
 
@@ -115,7 +102,12 @@ public class TestTSOClientRequestAndResponseBehaviours {
     }
 
     @AfterMethod
-    public void afterMethod() {
+    public void afterMethod() throws Exception {
+
+
+        tsoServer.stopAndWait();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
 
         pausableTSOracle.resume();